1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
| package com.matrix.areapartition;
import java.io.IOException;
import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.matrix.flowsum.FlowBean;
public class FlowSumArea {
public static class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = StringUtils.split(line, "\t");
String phoneNB = fields[1]; Long up_flow = Long.parseLong(fields[7]); Long d_flow = Long.parseLong(fields[8]);
context.write(new Text(phoneNB), new FlowBean(phoneNB, up_flow, d_flow));
}
}
public static class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
@Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
long up_flow_counter = 0;
long d_flow_counter = 0;
for (FlowBean value : values) { up_flow_counter += value.getUp_flow(); d_flow_counter += value.getD_flow(); }
context.write(key, new FlowBean(key.toString(), up_flow_counter, d_flow_counter)); }
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://node1:8020");
FileSystem fs = FileSystem.get(conf);
Job job = Job.getInstance(conf); job.setJarByClass(FlowSumArea.class);
job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowSumReducer.class);
job.setPartitionerClass(AreaPartitioner.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class);
FileInputFormat.addInputPaths(job, "/usr/matrix/input/flowsum.dat");
Path outer = new Path("/usr/matrix/output/flowsumarea");
if (fs.exists(outer)) { fs.delete(outer, true); }
FileOutputFormat.setOutputPath(job, outer);
boolean f = job.waitForCompletion(true);
if (f) { System.out.println("程序运行成功!"); }
} }
|