java - How to convert this old api mapreduce job code to new mapreduce -


this code below comes alex holmes hadoop in practice ver - 2: link: https://github.com/alexholmes/hiped2/tree/master/src/main/java/hip/ch5/http

this mapper of mapreduce code reads list of urls text file, sends http request , stores body content text file.

this code written based in old mapreduce api , wanted convert new version of mapreduce api. might simple changing jobconf job + configuration , extend new mapper can't make work code reason.

i rather wait post modified code avoid confusion original code mentioned below:

mapper code:

import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.ioutils; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapred.jobconf; import org.apache.hadoop.mapred.mapper; import org.apache.hadoop.mapred.outputcollector; import org.apache.hadoop.mapred.reporter;  import java.io.ioexception; import java.io.inputstream; import java.io.outputstream; import java.net.url; import java.net.urlconnection;  public final class httpdownloadmap     implements mapper<longwritable, text, text, text> {   private int file = 0;   private configuration conf;   private string joboutputdir;   private string taskid;   private int conntimeoutmillis =       default_connection_timeout_millis;   private int readtimeoutmillis = default_read_timeout_millis;   private final static int default_connection_timeout_millis = 5000;   private final static int default_read_timeout_millis = 5000;    public static final string conn_timeout =       "httpdownload.connect.timeout.millis";    public static final string read_timeout =       "httpdownload.read.timeout.millis";    @override   public void configure(jobconf job) {     conf = job;     joboutputdir = job.get("mapred.output.dir");     taskid = conf.get("mapred.task.id");      if (conf.get(conn_timeout) != null) {       conntimeoutmillis = integer.valueof(conf.get(conn_timeout));     }     if (conf.get(read_timeout) != null) {       readtimeoutmillis = integer.valueof(conf.get(read_timeout));     }   }    @override   public void map(longwritable key, text value,                   outputcollector<text, text> output,                   reporter reporter) throws ioexception {     path httpdest =         new path(joboutputdir, taskid + "_http_" + (file++));      inputstream = null;     outputstream os = null;     try {       urlconnection connection =           new url(value.tostring()).openconnection();       connection.setconnecttimeout(conntimeoutmillis);       connection.setreadtimeout(readtimeoutmillis);       = connection.getinputstream();        os = filesystem.get(conf).create(httpdest);        ioutils.copybytes(is, os, conf, true);     } {       ioutils.closestream(is);       ioutils.closestream(os);     }      output.collect(new text(httpdest.tostring()), value);   }    @override   public void close() throws ioexception {   } } 

job runner/driver code:

import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.text; import org.apache.hadoop.mapred.fileinputformat; import org.apache.hadoop.mapred.fileoutputformat; import org.apache.hadoop.mapred.jobclient; import org.apache.hadoop.mapred.jobconf;  public final class httpdownloadmapreduce {    public static void main(string... args) throws exception {     runjob(args[0], args[1]);   }    public static void runjob(string src, string dest)       throws exception {     jobconf job = new jobconf();     job.setjarbyclass(httpdownloadmap.class);      filesystem fs = filesystem.get(job);     path destination = new path(dest);      fs.delete(destination, true);      job.setmapperclass(httpdownloadmap.class);      job.setmapoutputkeyclass(text.class);     job.setmapoutputvalueclass(text.class);      fileinputformat.setinputpaths(job, src);     fileoutputformat.setoutputpath(job, destination);      jobclient.runjob(job);   } } 

run configurations:

args[0] = "testdata/input/urls.txt" args[1] = "testdata/output" 

urls.txt contains:

http://www.google.com  http://www.yahoo.com 

try these changes:

  1. importing org.apache.hadoop.mapreduce package instead of mapred ones.

  2. change old outputcollector , reporter context new api uses context objects writing.

  3. change jobclient job , jobconf configuration.


Comments

Popular posts from this blog

angularjs - ADAL JS Angular- WebAPI add a new role claim to the token -

php - CakePHP HttpSockets send array of paramms -

node.js - Using Node without global install -