MR流量汇总统计分区

发布 : 2016-03-20 分类 : 大数据 浏览 :

需求

1
根据归属地输出流量统计数据结果到不同文件,以便于在查询统计结果时可以定位到省级范围进行

分析

1
2
3
4
5
Mapreduce中会将map输出的kv对,按照相同key分组,然后分发给不同的reducetask
默认的分发规则为:根据key的hashcode%reducetask数来分发
所以:如果要按照我们自己的需求进行分组,则需要改写数据分发(分组)组件Partitioner
自定义一个CustomPartitioner继承抽象类:Partitioner
然后在job对象中,设置自定义partitioner: job.setPartitionerClass(CustomPartitioner.class)

代码结构

Markdown

数据准备

Markdown

Bean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package com.matrix.flowsums;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class FlowBean implements Writable {

// 上行流量
private long upFlow;
// 下行流量
private long dFlow;
// 总流量
private long sumFlow;

// 反射需要空参构造
// 反序列化时,需要反射调用空参构造函数,所以要显示定义一个
public FlowBean() {

}

public FlowBean(long upFlow, long dFlow) {
this.upFlow = upFlow;
this.dFlow = dFlow;
this.sumFlow = upFlow + dFlow;
}

public long getSumFlow() {
return sumFlow;
}

public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}

public long getUpFlow() {
return upFlow;
}

public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}

public long getdFlow() {
return dFlow;
}

public void setdFlow(long dFlow) {
this.dFlow = dFlow;
}

// 序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(dFlow);
out.writeLong(sumFlow);
}

// 反序列化方法
// 注意:反序列化的顺序跟序列化的顺序完全一致
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
dFlow = in.readLong();
sumFlow = in.readLong();
}

@Override
public String toString() {
return upFlow + "\t" + dFlow + "\t" + sumFlow;
}

}

Mapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.matrix.flowsums;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FlowSumsMapper extends Mapper<LongWritable, Text, Text, FlowBean> {

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 将一行内容转成string
String line = value.toString();
// 切分字段,数据不怎么整齐
String[] fields = line.split("\t");
// 取出手机号
String phoneNbr = fields[1];
// 取出上行流量下行流量
long upFlow = Long.parseLong(fields[fields.length-3]);
long dFlow = Long.parseLong(fields[fields.length-2]);
context.write(new Text(phoneNbr), new FlowBean(upFlow,dFlow));
}


}

Partition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.matrix.flowsums;

import java.util.HashMap;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
* 定义自己的从map到reduce之间的数据(分组)分发规则 按照手机号所属的省份来分发(分组)ProvincePartitioner
* 默认的分组组件是HashPartitioner
*
* @author
*
*/
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {

static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>();

static {
provinceMap.put("135", 0);
provinceMap.put("136", 1);
provinceMap.put("137", 2);
provinceMap.put("138", 3);
provinceMap.put("139", 4);
}

@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {

System.out.println("key.toString():"+key.toString());
System.out.println("key.toString().substring(0, 3):"+key.toString().substring(0, 3));

Integer code = provinceMap.get(key.toString().substring(0, 3));

return code == null ? 5 : code;
}

}

Reducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.matrix.flowsums;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowSumsReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

// <13677228239,bean1>,<13677228239,bean2>,<13677228239,bean3>,<13677228239,bean4>
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException {
long sum_upFlow = 0;
long sum_dFlow = 0;

// 遍历所有bean,将其中的上行流量,下行流量分别累加
for (FlowBean bean : values) {
sum_upFlow += bean.getUpFlow();
sum_dFlow += bean.getdFlow();
}

// 汇总上行,汇总下行
FlowBean resultBean = new FlowBean(sum_upFlow,sum_dFlow);
context.write(key,resultBean);
}

}

Main

1
2
3
4
5
注意:
如果reduceTask的数量 >= getPartition的结果数 ,则会多产生几个空的输出文件part-r-000xx
如果1 < reduceTask的数量 < getPartition的结果数 ,则有一部分分区数据无处安放,会Exception!!!
如果reduceTask的数量 = 1,则不管mapTask端输出多少个分区文件,最终结果都交给这一个reduceTask,
最终也就只会产生一个结果文件 part-r-00000
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package com.matrix.flowsums;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.matrix.mr.WordCountMapper;
import com.matrix.mr.WordCountReducer;
import com.matrix.mr.WordCountRunner;

public class FlowSumsRunner {

// 把业务逻辑相关的信息(哪个是mapper,哪个是reducer,要处理的数据在哪里,输出的结果放哪里.....)
// 描述成一个job对象
// 把这个描述好的job提交给集群去运行
public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.230.10:8020");
conf.set("yarn.resourcemanager.hostname", "192.168.230.13");

FileSystem fs = FileSystem.get(//
new URI("hdfs://192.168.230.10:8020"), //
conf, //
"root"//
);

Path outdir = new Path("/usr/matrix/output/flow");
if (fs.exists(outdir)) {
fs.delete(outdir, true);
}

Job job = Job.getInstance(conf);
// 设置用户名称
job.setJobName("flow");

// 指定我这个job所在jar包
job.setJarByClass(FlowSumsRunner.class);

job.setMapperClass(FlowSumsMapper.class);
job.setReducerClass(FlowSumsReducer.class);
// 设置我们的业务逻辑Mapper类的输出key和value的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 设置我们的业务逻辑Reducer类的输出key和value的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 设置partition分区
job.setPartitionerClass(ProvincePartitioner.class);
// 自定义partition后,要根据自定义partitioner的逻辑设置相应数量的reduce task
job.setNumReduceTasks(6);

// 指定要处理的数据所在的位置
// FileInputFormat.setInputPaths(job, "/usr/matrix/input/");
FileInputFormat.addInputPath(job, new Path("/usr/matrix/input/flow"));
// 指定处理完成之后的结果所保存的位置
FileOutputFormat.setOutputPath(job, outdir);

// 向yarn集群提交这个job
boolean res = job.waitForCompletion(true);
if (res) {
System.out.println("WordCount程序运行成功!");
}

}
}

运行结果

Markdown

Markdown

Markdown

本文作者 : Matrix
原文链接 : https://matrixsparse.github.io/2016/03/20/MR流量统计分区/
版权声明 : 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!

知识 & 情怀 | 二者兼得

微信扫一扫, 向我投食

微信扫一扫, 向我投食

支付宝扫一扫, 向我投食

支付宝扫一扫, 向我投食

留下足迹