MR案例之数据排序

发布 : 2016-03-20 分类 : 大数据 浏览 :
1
2
3
"数据排序"是许多实际任务执行时要完成的第一项工作,比如学生成绩评比、数据建立索引等。

这个实例和数据去重类似,都是先对原始数据进行初步处理,为进一步的数据操作打好基础。下面进入这个示例。

实例描述:

1
2
3
对输入文件中数据进行排序。输入文件中的每行内容均为一个数字,即一个数据。

要求在输出中每行有两个间隔的数字,其中,第一个代表原始数据在原始数据集中的位次,第二个代表原始数据。

f1.txt

1
2
3
4
5
6
7
8
9
10
11
12
13
2

32

654

32

15

756

65223

编辑f1.txt

1
[root@node1 data]# vi f1.txt

f2.txt

1
2
3
4
5
6
7
5956

22

650

92

编辑f2.txt

1
[root@node1 data]# vi f2.txt

f3.txt

1
2
3
4
5
26

54

6

编辑f3.txt

1
[root@node1 data]# vi f3.txt

1
2
3
4
5
[root@node1 hadoop-2.5.1]# hadoop fs -mkdir -p /usr/matrix/input/datasort/
[root@node1 hadoop-2.5.1]# hadoop fs -put data/f1.txt /usr/matrix/input/datasort/
[root@node1 hadoop-2.5.1]# hadoop fs -put data/f2.txt /usr/matrix/input/datasort/
[root@node1 hadoop-2.5.1]# hadoop fs -put data/f3.txt /usr/matrix/input/datasort/
[root@node1 hadoop-2.5.1]# hadoop fs -ls /usr/matrix/input/datasort/

DataSortMapper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.matrix.datasort;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DataSortMapper extends Mapper<Object, Text, IntWritable, IntWritable> {

private static IntWritable data = new IntWritable();

// Map将输入的value转换为IntWritable并作为输出的key
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();

data.set(Integer.parseInt(line));

// new IntWritable(Integer.parseInt(line))

context.write(data, new IntWritable(1));

}

}

DataSortReducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.matrix.datasort;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class DataSortReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

private static IntWritable linesum = new IntWritable(1);

@Override
protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
for (IntWritable value : values) {
context.write(linesum, key);
linesum = new IntWritable(linesum.get() + 1);
}
}

}

DataSortTest.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.matrix.datasort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DataSortTest {

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://node1:8020");

FileSystem fs = FileSystem.get(conf);

Job job = Job.getInstance(conf);

job.setJarByClass(DataSortTest.class);

job.setMapperClass(DataSortMapper.class);
job.setReducerClass(DataSortReducer.class);

job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.setInputPaths(job, "/usr/matrix/input/datasort");

Path outer = new Path("/usr/matrix/output/datasort/");

if (fs.exists(outer)) {
fs.delete(outer, true);
}

FileOutputFormat.setOutputPath(job, outer);

boolean f = job.waitForCompletion(true);

if (f) {
System.out.println("程序执行成功!");
}

}
}
本文作者 : Matrix
原文链接 : https://matrixsparse.github.io/2016/03/20/MR案例之数据排序/
版权声明 : 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!

知识 & 情怀 | 二者兼得

微信扫一扫, 向我投食

微信扫一扫, 向我投食

支付宝扫一扫, 向我投食

支付宝扫一扫, 向我投食

留下足迹