MRJoin连接

发布 : 2016-02-12 分类 : 大数据 浏览 :

思路

1
2
在map阶段读入student_class_info.txt、student_info.txt文件,将每条记录标示上文件名,
再将join的字段作为map输出的key,在reduce阶段再做笛卡尔积

Mapper

1
2
在map()函数中,通过getPath方法判断记录来自哪个文件,并根据每个文件的格式组合成新的输出键值对,
在输出时加上文件标识(student_info.txt是"1",student_class_info.txt是"r")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.matrix.join;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class JoinMapper extends Mapper<LongWritable, Text, Text, Text> {

// 左表
public static final String LEST_FILENAME = "student_info.txt";
// 右表
public static final String RIGHT_FILENAME = "student_class_info.txt";
// 左表标记
public static final String LEFT_FILENAME_FLAG = "1";
// 右表标记
public static final String RIGHT_FILENAME_FLAG = "r";

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 获取记录的HDFS路径
String filePath = ((FileSplit) context.getInputSplit()).getPath().toString();
String fileFlag = null;
String joinKey = null;
String joinValue = null;

// 判断记录来自哪个文件
if (filePath.contains(LEST_FILENAME)) {
fileFlag = LEFT_FILENAME_FLAG;
joinKey = value.toString().split("\t")[1];
joinValue = value.toString().split("\t")[0];
} else if (filePath.contains(RIGHT_FILENAME)) {
fileFlag = RIGHT_FILENAME_FLAG;
joinKey = value.toString().split("\t")[0];
joinValue = value.toString().split("\t")[1];
}

// 输出键值对并标示该结果是来自哪个文件
context.write(new Text(joinKey), new Text(joinValue + "\t" + fileFlag));
}

}

Reducer

1
2
3
4
5
6
7
8
9
真正的join操作其实是由Reducer完成的。由于在map()函数中按照join的字段作为输出的key,
这样在reduce()函数会接收到student_info.txt文件的一条记录
和student_class_info.txt文件的n条记录作为一次迭代

Jenny 00001
00001 Chinese
00001 Math

在最后的循环中求笛卡尔积并输出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.matrix.join;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class JoinReducer extends Reducer<Text, Text, Text, Text> {

// 左表
public static final String LEST_FILENAME = "student_info.txt";
// 右表
public static final String RIGHT_FILENAME = "student_class_info.txt";
// 左表标记
public static final String LEFT_FILENAME_FLAG = "1";
// 右表标记
public static final String RIGHT_FILENAME_FLAG = "r";

public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

Iterator<Text> iterator = values.iterator();

List<String> studentClassNames = new ArrayList<String>();
String studentName = "";

while (iterator.hasNext()) {
String[] infos = iterator.next().toString().split("\t");
// 判断这条记录来自于哪个文件,并根据文件格式解析记录获取相应的信息
if (infos[1].equals(LEFT_FILENAME_FLAG)) {
studentName = infos[0];
} else if (infos[1].equals(RIGHT_FILENAME_FLAG)) {
studentClassNames.add(infos[0]);
}
}

// 求笛卡而积
for (int i = 0; i < studentClassNames.size(); i++) {
context.write(new Text(studentName), new Text(studentClassNames.get(i)));
}

}

}

Main

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package com.matrix.join;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.matrix.wordcount.TokenizerMapper;
import com.matrix.wordcount.TokenizerReducer;
import com.matrix.wordcount.WordCount;
import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;

public class Join {


public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.230.10:8020");
conf.set("yarn.resourcemanager.hostname", "192.168.230.13");

FileSystem fs = FileSystem.get(//
new URI("hdfs://192.168.230.10:8020"), conf, "root");

Path outdir = new Path("/usr/matrix/output/join");
if (fs.exists(outdir)) {
fs.delete(outdir, true);
}

Job job = Job.getInstance(conf);
// 设置用户名称
job.setJobName("join");

// 指定我这个job所在jar包
job.setJarByClass(WordCount.class);

job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

// 指定输入路径
FileInputFormat.addInputPath(job, new Path("/usr/matrix/input/join"));
// 指定输出路径
FileOutputFormat.setOutputPath(job, outdir);

// 向yarn集群提交这个job
boolean res = job.waitForCompletion(true);
if (res) {
System.out.println("WordCount程序运行成功!");
}

}
}
本文作者 : Matrix
原文链接 : https://matrixsparse.github.io/2016/02/12/MRJoin连接/
版权声明 : 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!

知识 & 情怀 | 二者兼得

微信扫一扫, 向我投食

微信扫一扫, 向我投食

支付宝扫一扫, 向我投食

支付宝扫一扫, 向我投食

留下足迹