MR案例之单表连接

发布 : 2016-03-20 分类 : 大数据 浏览 :
1
2
3
实例中给出child-parent(孩子——父母)表,

要求输出grandchild-grandparent(孙子——爷奶)表。

具体实现:

1
2
3
4
在HDFS上创建目录,并上传文件数据

[root@node1 ~]# cd /opt/modules/hadoop-2.5.1/data
[root@node1 data]# vi stjoin.txt

1
2
[root@node1 data]# hadoop fs -mkdir -p /usr/matrix/input/stjoin/
[root@node1 data]# hadoop fs -put stjoin.txt /usr/matrix/input/stjoin/

stjoinMapper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package com.matrix.stjoin;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class stjoinMapper extends Mapper<Object, Text, Text, Text> {

/*
*
* map将输出分割child和parent,然后正序输出一次作为右表,
*
* 反序输出一次作为左表,需要注意的是在输出的value中必须
*
* 加上左右表的区别标识。
*
*/
@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String childname = new String();// 孩子名称

String parentname = new String();// 父母名称

String relationtype = new String();// 左右表标识

// 输入的一行预处理文本

StringTokenizer itr = new StringTokenizer(value.toString());

String[] values = new String[2];
System.out.println("Mapper端:"+values);

int i = 0;

while (itr.hasMoreTokens()) {

values[i] = itr.nextToken();

i++;

}

if (values[0].compareTo("child") != 0) {

childname = values[0];

parentname = values[1];

// 输出左表

relationtype = "1";

context.write(new Text(values[1]), new Text(relationtype +

"+" + childname + "+" + parentname));

// 输出右表

relationtype = "2";

context.write(new Text(values[0]), new Text(relationtype +

"+" + childname + "+" + parentname));

}
}

}

stjoinReducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package com.matrix.stjoin;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;

public class stjoinReducer extends Reducer<Text, Text, Text, Text> {

public static int time = 0;

@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 输出表头

if (0 == time) {

context.write(new Text("grandchild"), new Text("grandparent"));

time++;

}

int grandchildnum = 0;

String[] grandchild = new String[10];

int grandparentnum = 0;

String[] grandparent = new String[10];

Iterator ite = values.iterator();

while (ite.hasNext()) {

String record = ite.next().toString();

int len = record.length();

int i = 2;

if (0 == len) {

continue;

}

// 取得左右表标识

char relationtype = record.charAt(0);

// 定义孩子和父母变量

String childname = new String();

String parentname = new String();

// 获取value-list中value的child

while (record.charAt(i) != '+') {

childname += record.charAt(i);

i++;

}

i = i + 1;

// 获取value-list中value的parent

while (i < len) {

parentname += record.charAt(i);

i++;

}

// 左表,取出child放入grandchildren

if ('1' == relationtype) {

grandchild[grandchildnum] = childname;

grandchildnum++;

}

// 右表,取出parent放入grandparent

if ('2' == relationtype) {

grandparent[grandparentnum] = parentname;

grandparentnum++;

}

}

// grandchild和grandparent数组求笛卡尔儿积

if (0 != grandchildnum && 0 != grandparentnum) {

for (int m = 0; m < grandchildnum; m++) {

for (int n = 0; n < grandparentnum; n++) {

// 输出结果

context.write(new Text(grandchild[m]), new Text(grandparent[n]));

}

}

}

}

}

stjoinTest.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.matrix.stjoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class stjoinTest {

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://node1:8020");

FileSystem fs = FileSystem.get(conf);

Job job = Job.getInstance(conf);

job.setJarByClass(stjoinTest.class);

job.setMapperClass(stjoinMapper.class);
job.setReducerClass(stjoinReducer.class);

// 设置Mapper输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

// 设置reducer输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

FileInputFormat.setInputPaths(job, "/usr/matrix/input/stjoin");

Path outer = new Path("/usr/matrix/output/stjoin");

if (fs.exists(outer)) {
fs.delete(outer, true);
}

FileOutputFormat.setOutputPath(job, outer);

boolean f = job.waitForCompletion(true);

if (f) {
System.out.println("程序执行成功!");
}

}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
grandchildgrandparent
TomAlice
TomJesse
JoneAlice
JoneJesse
TomBen
TomMary
JoneBen
JoneMary
PhilipAlice
PhilipJesse
MarkAlice
MarkJesse

本文作者 : Matrix
原文链接 : https://matrixsparse.github.io/2016/03/20/MR案例之单表连接/
版权声明 : 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!

知识 & 情怀 | 二者兼得

微信扫一扫, 向我投食

微信扫一扫, 向我投食

支付宝扫一扫, 向我投食

支付宝扫一扫, 向我投食

留下足迹