MR二次排序

发布 : 2016-01-12 分类 : 大数据 浏览 :

思路:

1
二次排序的含义为先按某列对数据进行排序,在该次排序的基础上再按照另一列的值进行排序
1
2
3
4
5
6
7
8
9
4	3
4 2
4 1
3 4
2 7
2 3
3 1
3 2
3 3
1
这是原始数据集,经过二次排序后,输出的数据为:
1
2
3
4
5
6
7
8
9
2	3
2 7
3 1
3 2
3 3
3 4
4 1
4 2
4 3
1
由于Hadoop框架默认会进行排序,所以完成二次排序的关键在于控制Hadoop的排序操作

Mapper

1
在map函数中,将输入的value作为输出key输出,value代表一整行数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.matrix.secondarysort;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
* 在map()函数中,将输入的value作为输出的key输出,value代表一行数据
*
* SecondaryMapper<BR>
* 创建人:Matrix <BR>
* @version 1.0.0
*
*/
public class SecondaryMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

// 仅仅是将value作为key输出
context.write(value, NullWritable.get());

}

}

Partition

1
在分区过程中,按照第一个排序进行分发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.matrix.secondarysort;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

/**
* 在分区过程中,按照第一个排序字段进行分发
*
* KeyPartitioner<BR>
* 创建人:Matrix <BR>
*
* @version 1.0.0
*
*/
public class KeyPartitioner extends HashPartitioner<Text, NullWritable> {

@Override
public int getPartition(Text key, NullWritable value, int numReduceTasks) {
return (key.toString().split("\t")[0].hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}

}

SortComparator

1
SortComparator是本例中最为关键的类,它改变了Hadoop
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.matrix.secondarysort;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
* SortComparator是本例中最为关键的类,它改变了Hadoop默认的排序规则,并按照二次排序的逻辑进行排序
*
* SortComparator<BR>
* 创建人:Matrix <BR>
*
* @version 1.0.0
*
*/
public class SortComparator extends WritableComparator {

protected SortComparator() {
super(Text.class, true);
}

@Override
public int compare(WritableComparable key1, WritableComparable key2) {
// 如果一个排序字段相同,则需比较第二个排序字段
if (Integer.parseInt(key1.toString().split("\t")[0]) == Integer.parseInt(key2.toString().split("\t")[0])) {
if (Integer.parseInt(key1.toString().split("\t")[1]) > Integer.parseInt(key2.toString().split("\t")[1])) {
return 1;
} else if (Integer.parseInt(key1.toString().split("\t")[1]) < Integer
.parseInt(key2.toString().split("\t")[1])) {
return -1;
} else if (Integer.parseInt(key1.toString().split("\t")[1]) == Integer
.parseInt(key2.toString().split("\t")[1])) {
return 0;
}
// 如果第一个排序字段不同,则比较第一个排序字段
} else {
if (Integer.parseInt(key1.toString().split("\t")[0]) > Integer.parseInt(key2.toString().split("\t")[0])) {
return 1;
} else if (Integer.parseInt(key1.toString().split("\t")[0]) < Integer
.parseInt(key2.toString().split("\t")[0])) {
return -1;
}
}
return 0;
}

}

Reducer

1
大部分工作已经由SortComparator完成,在数据进入reduce函数之前就已经是按照要求排好序的了,所以Reducer只需要原样输出即可
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.matrix.secondarysort;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SecondaryReducer extends Reducer<Text, IntWritable, NullWritable, Text> {

public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

for(Text value: values){
context.write(NullWritable.get(), value);
}

}

}

Main

1
2
在Driver类中进行常规的作业设置,但是需要注意一点的是必须手动将Reducer的个数设置为1,这样出来的结果才会是全局有序的,否则只是在每个
Reducer中有序。为了让作业处理的更合理,我们还可以在数据进入reduce函数时默认再分一次组,Hadoop默认是按key来分组
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package com.matrix.secondarysort;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.matrix.wordcount.WordCount;
import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;

public class SecondarySort {

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.230.10:8020");
conf.set("yarn.resourcemanager.hostname", "192.168.230.13");

FileSystem fs = FileSystem.get(//
new URI("hdfs://192.168.230.10:8020"), conf, "root");

Path outdir = new Path("/usr/matrix/output/secondarysort");
if (fs.exists(outdir)) {
fs.delete(outdir, true);
}

Job job = Job.getInstance(conf);
// 设置用户名称
job.setJobName("secondarysort");

// 指定我这个job所在jar包
job.setJarByClass(WordCount.class);

job.setMapperClass(SecondaryMapper.class);
job.setReducerClass(SecondaryReducer.class);

job.setPartitionerClass(KeyPartitioner.class);
job.setSortComparatorClass(SortComparator.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);

job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

// 指定输入路径
FileInputFormat.addInputPath(job, new Path("/usr/matrix/input/secondarysort"));
// 指定输出路径
FileOutputFormat.setOutputPath(job, outdir);

// 需要将Reducer的个数强制设定为1
job.setNumReduceTasks(1);


// 向yarn集群提交这个job
boolean res = job.waitForCompletion(true);
if (res) {
System.out.println("WordCount程序运行成功!");
}

}

}
本文作者 : Matrix
原文链接 : https://matrixsparse.github.io/2016/01/12/MR二次排序/
版权声明 : 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!

知识 & 情怀 | 二者兼得

微信扫一扫, 向我投食

微信扫一扫, 向我投食

支付宝扫一扫, 向我投食

支付宝扫一扫, 向我投食

留下足迹