java - How to convert this old api mapreduce job code to new mapreduce -
this code below comes alex holmes hadoop in practice ver - 2: link: https://github.com/alexholmes/hiped2/tree/master/src/main/java/hip/ch5/http
this mapper of mapreduce code reads list of urls text file, sends http request , stores body content text file.
this code written based in old mapreduce api , wanted convert new version of mapreduce api. might simple changing jobconf job + configuration , extend new mapper can't make work code reason.
i rather wait post modified code avoid confusion original code mentioned below:
mapper code:
import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.ioutils; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapred.jobconf; import org.apache.hadoop.mapred.mapper; import org.apache.hadoop.mapred.outputcollector; import org.apache.hadoop.mapred.reporter; import java.io.ioexception; import java.io.inputstream; import java.io.outputstream; import java.net.url; import java.net.urlconnection; public final class httpdownloadmap implements mapper<longwritable, text, text, text> { private int file = 0; private configuration conf; private string joboutputdir; private string taskid; private int conntimeoutmillis = default_connection_timeout_millis; private int readtimeoutmillis = default_read_timeout_millis; private final static int default_connection_timeout_millis = 5000; private final static int default_read_timeout_millis = 5000; public static final string conn_timeout = "httpdownload.connect.timeout.millis"; public static final string read_timeout = "httpdownload.read.timeout.millis"; @override public void configure(jobconf job) { conf = job; joboutputdir = job.get("mapred.output.dir"); taskid = conf.get("mapred.task.id"); if (conf.get(conn_timeout) != null) { conntimeoutmillis = integer.valueof(conf.get(conn_timeout)); } if (conf.get(read_timeout) != null) { readtimeoutmillis = integer.valueof(conf.get(read_timeout)); } } @override public void map(longwritable key, text value, outputcollector<text, text> output, reporter reporter) throws ioexception { path httpdest = new path(joboutputdir, taskid + "_http_" + (file++)); inputstream = null; outputstream os = null; try { urlconnection connection = new url(value.tostring()).openconnection(); connection.setconnecttimeout(conntimeoutmillis); connection.setreadtimeout(readtimeoutmillis); = connection.getinputstream(); os = filesystem.get(conf).create(httpdest); ioutils.copybytes(is, os, conf, true); } { ioutils.closestream(is); ioutils.closestream(os); } output.collect(new text(httpdest.tostring()), value); } @override public void close() throws ioexception { } }
job runner/driver code:
import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.text; import org.apache.hadoop.mapred.fileinputformat; import org.apache.hadoop.mapred.fileoutputformat; import org.apache.hadoop.mapred.jobclient; import org.apache.hadoop.mapred.jobconf; public final class httpdownloadmapreduce { public static void main(string... args) throws exception { runjob(args[0], args[1]); } public static void runjob(string src, string dest) throws exception { jobconf job = new jobconf(); job.setjarbyclass(httpdownloadmap.class); filesystem fs = filesystem.get(job); path destination = new path(dest); fs.delete(destination, true); job.setmapperclass(httpdownloadmap.class); job.setmapoutputkeyclass(text.class); job.setmapoutputvalueclass(text.class); fileinputformat.setinputpaths(job, src); fileoutputformat.setoutputpath(job, destination); jobclient.runjob(job); } }
run configurations:
args[0] = "testdata/input/urls.txt" args[1] = "testdata/output"
urls.txt contains:
http://www.google.com http://www.yahoo.com
try these changes:
importing
org.apache.hadoop.mapreduce
package instead of mapred ones.change old
outputcollector
,reporter
context
new api usescontext
objects writing.change
jobclient
job
,jobconf
configuration
.
Comments
Post a Comment