MR流量汇总程序开发

发布 : 2016-01-20 分类 : 大数据 浏览 :

需求

1
2
3
4
5
6
7
8
9
统计每一个用户(手机号)所耗费的总上行流量、下行流量、总流量

map端
读一行,切分字段
抽取手机号,上行流量,下行流量
context.write(手机号,bean)

reduce端
上行流量求和 下行流量求和 总流量

代码结构

Markdown

数据准备

Markdown

FlowBean.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package com.matrix.flowsums;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class FlowBean implements Writable {

// 上行流量
private long upFlow;
// 下行流量
private long dFlow;
// 总流量
private long sumFlow;

// 反射需要空参构造
// 反序列化时,需要反射调用空参构造函数,所以要显示定义一个
public FlowBean() {

}

public FlowBean(long upFlow, long dFlow) {
this.upFlow = upFlow;
this.dFlow = dFlow;
this.sumFlow = upFlow + dFlow;
}

public long getSumFlow() {
return sumFlow;
}

public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}

public long getUpFlow() {
return upFlow;
}

public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}

public long getdFlow() {
return dFlow;
}

public void setdFlow(long dFlow) {
this.dFlow = dFlow;
}

// 序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(dFlow);
out.writeLong(sumFlow);
}

// 反序列化方法
// 注意:反序列化的顺序跟序列化的顺序完全一致
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
dFlow = in.readLong();
sumFlow = in.readLong();
}

@Override
public String toString() {
return upFlow + "\t" + dFlow + "\t" + sumFlow;
}

}

FlowSumsMapper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.matrix.flowsums;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FlowSumsMapper extends Mapper<LongWritable, Text, Text, FlowBean> {

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 将一行内容转成string
String line = value.toString();
// 切分字段,数据不怎么整齐
String[] fields = line.split("\t");
// 取出手机号
String phoneNbr = fields[1];
// 取出上行流量下行流量
long upFlow = Long.parseLong(fields[fields.length-3]);
long dFlow = Long.parseLong(fields[fields.length-2]);
context.write(new Text(phoneNbr), new FlowBean(upFlow,dFlow));
}


}

FlowSumsReducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.matrix.flowsums;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowSumsReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

// <13677228239,bean1>,<13677228239,bean2>,<13677228239,bean3>,<13677228239,bean4>
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException {
long sum_upFlow = 0;
long sum_dFlow = 0;

// 遍历所有bean,将其中的上行流量,下行流量分别累加
for (FlowBean bean : values) {
sum_upFlow += bean.getUpFlow();
sum_dFlow += bean.getdFlow();
}

// 汇总上行,汇总下行
FlowBean resultBean = new FlowBean(sum_upFlow,sum_dFlow);
context.write(key,resultBean);
}

}

FlowSumsRunner.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package com.matrix.flowsums;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.matrix.mr.WordCountMapper;
import com.matrix.mr.WordCountReducer;
import com.matrix.mr.WordCountRunner;

public class FlowSumsRunner {

// 把业务逻辑相关的信息(哪个是mapper,哪个是reducer,要处理的数据在哪里,输出的结果放哪里.....)
// 描述成一个job对象
// 把这个描述好的job提交给集群去运行
public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.230.10:8020");
conf.set("yarn.resourcemanager.hostname", "192.168.230.13");

FileSystem fs = FileSystem.get(//
new URI("hdfs://192.168.230.10:8020"), //
conf, //
"root"//
);

Path outdir = new Path("/usr/matrix/output/flow");
if (fs.exists(outdir)) {
fs.delete(outdir, true);
}

Job job = Job.getInstance(conf);
// 设置用户名称
job.setJobName("flow");

// 指定我这个job所在jar包
job.setJarByClass(FlowSumsRunner.class);

job.setMapperClass(FlowSumsMapper.class);
job.setReducerClass(FlowSumsReducer.class);
// 设置我们的业务逻辑Mapper类的输出key和value的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 设置我们的业务逻辑Reducer类的输出key和value的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

// 指定要处理的数据所在的位置
// FileInputFormat.setInputPaths(job, "/usr/matrix/input/");
FileInputFormat.addInputPath(job, new Path("/usr/matrix/input/flow"));
// 指定处理完成之后的结果所保存的位置
FileOutputFormat.setOutputPath(job, outdir);

// 向yarn集群提交这个job
boolean res = job.waitForCompletion(true);
if (res) {
System.out.println("WordCount程序运行成功!");
}

}
}

运行结果

Markdown

Markdown

本文作者 : Matrix
原文链接 : https://matrixsparse.github.io/2016/01/20/MR流量汇总程序开发/
版权声明 : 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!

知识 & 情怀 | 二者兼得

微信扫一扫, 向我投食

微信扫一扫, 向我投食

支付宝扫一扫, 向我投食

支付宝扫一扫, 向我投食

留下足迹