MR案例之自定义分区

发布 : 2016-01-12 分类 : 大数据 浏览 :

FlowSumArea.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package com.matrix.areapartition;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.matrix.flowsum.FlowBean;

/**
* 对流量原始日志进行分析,将不同省份的用户统计分析到不同文件
*
*
*
* FlowSumArea<BR>
* 创建人:Matrix <BR>
* 时间:2016年3月11日-上午8:10:02 <BR>
*
* @version 1.0.0
*
*/
public class FlowSumArea {

// 需要自定义改造两个机制:

// 1、改造分区逻辑partitioner,自定义一个partitioner
// 2、自定义reducer task的并发任务数

// Mapper
public static class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();

String[] fields = StringUtils.split(line, "\t");

String phoneNB = fields[1];
Long up_flow = Long.parseLong(fields[7]);
Long d_flow = Long.parseLong(fields[8]);

context.write(new Text(phoneNB), new FlowBean(phoneNB, up_flow, d_flow));

}

}

// Reducer
public static class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException {

// 定义上行流量计数器
long up_flow_counter = 0;

// 定义上行流量计数器
long d_flow_counter = 0;

for (FlowBean value : values) {
// 叠加相同手机号的上行流量
up_flow_counter += value.getUp_flow();
// 叠加相同手机号的下行流量
d_flow_counter += value.getD_flow();
}

context.write(key, new FlowBean(key.toString(), up_flow_counter, d_flow_counter));
}

}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://node1:8020");

FileSystem fs = FileSystem.get(conf);

Job job = Job.getInstance(conf);
job.setJarByClass(FlowSumArea.class);

job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowSumReducer.class);

// 设定自定义的分组
job.setPartitionerClass(AreaPartitioner.class);

// job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(FlowBean.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

// 设定reduce的任务并发数,跟分组的数量保持一致,只能分多不能分少
/**
* 设置reduce task数量要跟AreaPartitioner返回的partition个数匹配 reduce task或map
* task指的是,reducer和mapper在集群中运行的实例,在进程中都叫yarn-child
*/
job.setNumReduceTasks(7);

// job.setInputFormatClass(TextInputFormat.class);
// job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.addInputPaths(job, "/usr/matrix/input/flowsum.dat");

Path outer = new Path("/usr/matrix/output/flowsumarea");

if (fs.exists(outer)) {
fs.delete(outer, true);
}

FileOutputFormat.setOutputPath(job, outer);

boolean f = job.waitForCompletion(true);

if (f) {
System.out.println("程序运行成功!");
}

}
}

AreaPartitioner.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.matrix.areapartition;

import java.util.HashMap;

import org.apache.hadoop.mapreduce.Partitioner;

/**
* 自定义分区类
*
* AreaPartitioner<BR>
* 创建人:Matrix <BR>
* 时间:2016年3月12日-下午4:43:29 <BR>
*
* @version 1.0.0
*
*/
public class AreaPartitioner<Key, Value> extends Partitioner<Key, Value> {

private static HashMap<String, Integer> areaMap = new HashMap();

// 数据字典
static {
areaMap.put("134", 0);
areaMap.put("135", 1);
areaMap.put("136", 2);
areaMap.put("137", 3);
areaMap.put("138", 4);
areaMap.put("139", 5);

}

@Override
public int getPartition(Key key, Value value, int numPartitioner) {

// 从key中拿手机号,查询手机归属字典,不同省份返回不同的组号
int areaCoder = areaMap.get(key.toString().substring(0, 3)) == null ? 6
: areaMap.get(key.toString().substring(0, 3));

return areaCoder;
}

}

运行结果:

本文作者 : Matrix
原文链接 : https://matrixsparse.github.io/2016/01/12/MR案例之自定义分区/
版权声明 : 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!

知识 & 情怀 | 二者兼得

微信扫一扫, 向我投食

微信扫一扫, 向我投食

支付宝扫一扫, 向我投食

支付宝扫一扫, 向我投食

留下足迹