1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| package com.matrix.weblog;
import java.io.IOException; import java.net.URI;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WeblogPreProcess {
static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> { Text k = new Text(); NullWritable v = NullWritable.get();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString(); WebLogBean webLogBean = WebLogParser.parser(line); if (!webLogBean.isValid()) return; k.set(webLogBean.toString()); context.write(k, v);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.230.10:8020"); conf.set("yarn.resourcemanager.hostname", "192.168.230.13");
FileSystem fs = FileSystem.get( new URI("hdfs://192.168.230.10:8020"), conf, "root");
Path outdir = new Path("/usr/matrix/output/weblog"); if (fs.exists(outdir)) { fs.delete(outdir, true); }
Job job = Job.getInstance(conf); job.setJobName("weblog");
job.setJarByClass(WeblogPreProcess.class);
job.setMapperClass(WeblogPreProcessMapper.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path("/usr/matrix/input/weblog/")); FileOutputFormat.setOutputPath(job, outdir);
boolean res = job.waitForCompletion(true); if (res) { System.out.println("WordCount程序运行成功!"); }
} }
|