MR案例之手机流量

发布 : 2016-01-20 分类 : 大数据 浏览 :

序列化原理

Eclipse快捷键:

1
2
main方法快捷:main+alt+/
输出快捷:sysout+alt+/

FlowBean.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package com.matrix.flowsum;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

/**
*
*
* FlowBean<BR>
* 创建人:Matrix <BR>
* 时间:2016年3月10日-上午8:56:04 <BR>
*
* @version 1.0.0
*
*/
public class FlowBean implements Writable {

// 为什么要实现?Writable
// 实现Writable接口后,可以重写序列化和反序列化接口

// 手机号码
private String phoneNB;
// 上行流量
private long up_flow;
// 下行流量
private long d_flow;
// 总流量
private long s_flow;

// 空参构造函数
// 在反序列化时,反射机制需要调用空参构造函数,所以显示定义了一个空参构造函数
public FlowBean() {
super();
}

// 为了方便数据初始化方便加入一个带参的构造函数
// 有参构造函数
public FlowBean(String phoneNB, long up_flow, long d_flow) {
super();
this.phoneNB = phoneNB;
this.up_flow = up_flow;
this.d_flow = d_flow;
this.s_flow = up_flow + d_flow;
}

public String getPhoneNB() {
return phoneNB;
}

public void setPhoneNB(String phoneNB) {
this.phoneNB = phoneNB;
}

public long getUp_flow() {
return up_flow;
}

public void setUp_flow(long up_flow) {
this.up_flow = up_flow;
}

public long getD_flow() {
return d_flow;
}

public void setD_flow(long d_flow) {
this.d_flow = d_flow;
}

public long getS_flow() {
return s_flow;
}

public void setS_flow(long s_flow) {
this.s_flow = s_flow;
}

// 将对象序列化到流中
// 把bean中的数据序列化到数据输出流中去
@Override
public void write(DataOutput out) throws IOException {
// 手机号码
out.writeUTF(phoneNB);
// 上行流量
out.writeLong(up_flow);
// 下行流量
out.writeLong(d_flow);
// 总流量
out.writeLong(s_flow);
}

// 从数据流中反序列出对象的数据
// 从数据读入流读出对象字段时,必须要跟序列化顺序保持一致
@Override
public void readFields(DataInput in) throws IOException {
// 手机号码
phoneNB = in.readUTF();
// 上行流量
up_flow = in.readLong();
// 下行流量
d_flow = in.readLong();
// 总流量
s_flow = in.readLong();
}

@Override
public String toString() {
return "【手机号码:" + phoneNB + " 上行流量:" + up_flow + " 下行流量:" + d_flow + " 总流量:" + s_flow+"】";
}

}

FlowSumMapper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.matrix.flowsum;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
* LongWritable(流量), Text,Text,FlowBean(自定义类)
*
* FlowSumMapper<BR>
* 创建人:Matrix <BR>
* 时间:2016年3月10日-上午8:45:34 <BR>
*
* @version 1.0.0
*
*/
public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> {

// FlowBean是我们自定义的一种数据类型,要在hadoop的各个节点之间传输,应该遵循hadoop的序列化机制,就必须实现hadoop相应的序列化接口LongWritable

// 拿到日志中的一行数据,然后抽取出需要的一行字段,手机号、上行流量、下行流量,然后封装成kv发送出去
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 拿一行数据,并切分成各个字段
// String[] line = value.toString().split(" ");

String line = value.toString();

// 按照"\t"制表符切分字符串
String[] field = StringUtils.split(line, "\t");

// 拿到需要的数据
// 获取手机号码
String phoneNB = field[1];
// 获取上行流量
long up_flow = Long.parseLong(field[7]);
// 获取下行流量
long d_flow = Long.parseLong(field[8]);

// 封装数据为键值对Key/Value
System.out.println(phoneNB + "-------->" + new FlowBean(phoneNB, up_flow, d_flow).toString());
context.write(new Text(phoneNB), new FlowBean(phoneNB, up_flow, d_flow));

}

}

FlowSumReducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.matrix.flowsum;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

// reduce的业务逻辑就是遍历values,然后累加求和再输出
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException {
long up_flow_counter = 0;
long d_flow_counter = 0;

// 求相同手机号的上行流量和、下行流量和
for (FlowBean flowBean : values) {
up_flow_counter += flowBean.getUp_flow();
d_flow_counter += flowBean.getD_flow();
}

// 输出
context.write(key, new FlowBean(key.toString(), up_flow_counter, d_flow_counter));
}
}

FlowSumRunner.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77

package com.matrix.flowsum;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import org.apache.hadoop.io.Text;

/**
* Job提交和规范写法
*
* FlowSumRunner<BR>
* 创建人:Matrix <BR>
* 时间:2016年3月10日-下午4:24:29 <BR>
*
* @version 1.0.0
*
*/
public class FlowSumRunner {

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://node1:8020");

FileSystem fs = FileSystem.get(//
new URI("hdfs://node1:8020"), //
conf, //
"root"//
);

Job job = Job.getInstance(conf);
// 设置用户名称
job.setJobName("flowsum");

job.setJarByClass(FlowSumRunner.class);

job.setMapperClass(FlowSumMapper.class);
job.setReducerClass(FlowSumReducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

// job.setNumReduceTasks(3);

FileInputFormat.setInputPaths(job, new Path("/usr/matrix/input/flowsum.dat"));

// 针对目录进行判断
Path outdir = new Path("/usr/matrix/output/flowsum");
if (fs.exists(outdir)) {
fs.delete(outdir, true);
}

// 设置输出数据目录
// path 一个目录 而且不能存在
FileOutputFormat.setOutputPath(job, outdir);
boolean f = job.waitForCompletion(true);

if (f) {
System.out.println("WordCount程序运行成功!");
}

}
}

运行结果:

处理之后的文件内容:

自定义排序实现

1
针对文件内容实现自定义排序

SortMR.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package com.matrix.SortMR;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.matrix.flowsum.FlowBean;

public class SortMR {

public static class SortMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable> {

// 拿到一行数据切分出各个字段,封装为一个flowbean,作为key输出
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 拿到一行数据
String line = value.toString();
// 且各个字段
String[] fields = StringUtils.split(line, "\t");
// 手机号码
String phoneNB = fields[0];
// 上行流量
long up_flow = Long.parseLong(fields[1]);
// 下行流量
long d_flow = Long.parseLong(fields[2]);

context.write(new FlowBean(phoneNB, up_flow, d_flow), NullWritable.get());
}

}

public static class SortReducer extends Reducer<FlowBean, NullWritable, Text, FlowBean> {

@Override
protected void reduce(FlowBean key, Iterable<NullWritable> value, Context context)
throws IOException, InterruptedException {

System.out.println(key);

String phoneNB = key.getPhoneNB();
context.write(new Text(phoneNB), key);
}

}

public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://node1:8020");

try {

FileSystem fs = FileSystem.get(conf);

Job job = Job.getInstance(conf);

job.setJarByClass(SortMR.class);

job.setJobName("sortMR");

job.setMapperClass(SortMapper.class);
job.setReducerClass(SortReducer.class);

job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(NullWritable.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

// 设置数据输入路径
FileInputFormat.setInputPaths(job, "/usr/matrix/input/sort");

// 设置输出输出路径
Path outer = new Path("/usr/matrix/output/sortMR");

// 如果存在路径
if (fs.exists(outer)) {
fs.delete(outer, true);
}

FileOutputFormat.setOutputPath(job, outer);

boolean f = job.waitForCompletion(true);

if (f) {
System.out.println("程序运行成功!");
}

} catch (Exception e) {
e.printStackTrace();
}
}
}

运行结果:

处理过后的文件:

自定义分组

1
原始数据开头相同的手机号码并是不分在一个组

目标:将相同开头的手机号码分到同一组

FlowSumArea.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
	package com.matrix.areapartition;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.matrix.flowsum.FlowBean;

/**
* 对流量原始日志进行分析,将不同省份的用户统计分析到不同文件
*
*
*
* FlowSumArea<BR>
* 创建人:Matrix <BR>
* 时间:2016年3月11日-上午8:10:02 <BR>
*
* @version 1.0.0
*
*/
public class FlowSumArea {

// 需要自定义改造两个机制:

// 1、改造分区逻辑partitioner,自定义一个partitioner
// 2、自定义reducer task的并发任务数

// Mapper
public static class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();

String[] fields = StringUtils.split(line, "\t");

String phoneNB = fields[1];
Long up_flow = Long.parseLong(fields[7]);
Long d_flow = Long.parseLong(fields[8]);

context.write(new Text(phoneNB), new FlowBean(phoneNB, up_flow, d_flow));

}

}

// Reducer
public static class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException {

// 定义上行流量计数器
long up_flow_counter = 0;

// 定义上行流量计数器
long d_flow_counter = 0;

for (FlowBean value : values) {
// 叠加相同手机号的上行流量
up_flow_counter += value.getUp_flow();
// 叠加相同手机号的下行流量
d_flow_counter += value.getD_flow();
}

context.write(key, new FlowBean(key.toString(), up_flow_counter, d_flow_counter));
}

}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://node1:8020");

FileSystem fs = FileSystem.get(conf);

Job job = Job.getInstance(conf);
job.setJarByClass(FlowSumArea.class);

job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowSumReducer.class);

// 设定自定义的分组
job.setPartitionerClass(AreaPartitioner.class);

// job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(FlowBean.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

FileInputFormat.addInputPaths(job, "/usr/matrix/input/flowsum.dat");

Path outer = new Path("/usr/matrix/output/flowsumarea");

if (fs.exists(outer)) {
fs.delete(outer, true);
}

FileOutputFormat.setOutputPath(job, outer);

boolean f = job.waitForCompletion(true);

if (f) {
System.out.println("程序运行成功!");
}

}
}

AreaPartitioner.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.matrix.areapartition;

import java.util.HashMap;

import org.apache.hadoop.mapreduce.Partitioner;

/**
* 自定义分区类
*
* AreaPartitioner<BR>
* 创建人:Matrix <BR>
* 时间:2016年3月12日-下午4:43:29 <BR>
* @version 1.0.0
*
*/
public class AreaPartitioner<Key, Value> extends Partitioner<Key, Value> {

private static HashMap<String, Integer> areaMap = new HashMap();

// 数据字典
static {
areaMap.put("135", 0);
areaMap.put("136", 1);
areaMap.put("137", 2);
areaMap.put("138", 3);
areaMap.put("139", 4);

}

@Override
public int getPartition(Key key, Value value, int numPartitioner) {
// 从key中拿手机号,查询手机归属字典,不同省份返回不同的组号

int areaCoder = areaMap.get(key.toString().substring(0, 3)) == null ? 5
: areaMap.get(key.toString().substring(0, 3));

return areaCoder;
}

}

运行结果:

没有分组之前的数据:

分组后的数据:

注意:

分组如果设置少了的话就会报错

1
2
系统默认reduce task:
job.setNumReduceTasks(1);
本文作者 : Matrix
原文链接 : https://matrixsparse.github.io/2016/01/20/MR案例之手机流量/
版权声明 : 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!

知识 & 情怀 | 二者兼得

微信扫一扫, 向我投食

微信扫一扫, 向我投食

支付宝扫一扫, 向我投食

支付宝扫一扫, 向我投食

留下足迹