MR实现平均成绩

发布 : 2016-03-20 分类 : 大数据 浏览 :
1
2
3
4
5
6
7
8
9
mapreduce是否可以完成我们传统开发中经常遇到的一些任务。

例如排序、平均数、批量word转换等。它和我们传统开发有什么不同。

那么我们可以带着下面问题来阅读:

1.mapreduce是如何求平均值的?
2.map在求平均值的作用是什么?
3.reduce在求平均值的作用是什么?

一、简介:

1
"平均成绩"主要目的还是在重温经典"WordCount"例子,可以说是在基础上的微变化版,该实例主要就是实现一个计算学生平均成绩的例子。

二、实例描述

1
2
3
4
5
对输入文件中数据进行就算学生平均成绩。

输入文件中的每行内容均为一个学生的姓名和他相应的成绩,如果有多门学科,则每门学科为一个文件。

要求在输出中每行有两个间隔的数据,其中,第一个代表学生的姓名,第二个代表其平均成绩。

math.txt

1
2
3
4
5
6
7
张三    88

李四 99

王五 66

赵六 77

chinese.txt

张三 78

李四 89

王五 96

赵六 67

english.txt

张三 80

李四 82

王五 84

赵六 86

三、设计思路

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
计算学生平均成绩是一个仿"WordCount"例子,用来重温一下开发MapReduce程序的流程。

程序包括两部分的内容:Map部分和Reduce部分,分别实现了map和reduce的功能。

Map处理的是一个纯文本文件,文件中存放的数据时每一行表示一个学生的姓名和他相应一科成绩。

Mapper处理的数据是由InputFormat分解过的数据集,其中InputFormat的作用是将数据集切割成小数据集InputSplit,

每一个InputSlit将由一个Mapper负责处理。

此外,InputFormat中还提供了一个RecordReader的实现,并将一个InputSplit解析成<key,value>对提供给了map函数。

InputFormat的默认值是TextInputFormat,它针对文本文件,按行将文本切割成InputSlit,并用LineRecordReader将InputSplit解析成<key,value>对,

key是行在文本中的位置,value是文件中的一行。

Map的结果会通过partion分发到Reducer,Reducer做完Reduce操作后,将通过以格式OutputFormat输出。

Mapper最终处理的结果对<key,value>,会送到Reducer中进行合并,合并的时候,有相同key的键/值对则送到同一个Reducer上。

Reducer是所有用户定制Reducer类地基础,它的输入是key和这个key对应的所有value的一个迭代器,同时还有Reducer的上下文。

Reduce的结果由Reducer.Context的write方法输出到文件中。
1
2
在HDFS文件系统上创建目录
[root@node1 hadoop-2.5.1]# hadoop fs -mkdir -p /usr/matrix/input/score
1
2
3
4
5
将文件上传到HDFS文件系统之上
[root@node1 hadoop-2.5.1]# hadoop fs -put data/chinese.txt /usr/matrix/input/score
[root@node1 hadoop-2.5.1]# hadoop fs -put data/english.txt /usr/matrix/input/score
[root@node1 hadoop-2.5.1]# hadoop fs -put data/math.txt /usr/matrix/input/score
[root@node1 hadoop-2.5.1]# hadoop fs -ls /usr/matrix/input/score

AvgscoreMapper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.matrix.avgscore;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class AvgscoreMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();

String[] str = StringUtils.split(line, "\t");

// String k = str[0];
// String v = str[1];
//
// System.out.println(line);
// System.out.println(k);
// System.out.println(v);

StringTokenizer tokenizer = new StringTokenizer(line, "\n");
// 分别对每行进行处理
while (tokenizer.hasMoreElements()) {
StringTokenizer tokenizerLine = new StringTokenizer(tokenizer.nextToken());
String strName = tokenizerLine.nextToken();// 学生姓名部分
String strScore = tokenizerLine.nextToken();// 成绩部分
Text name = new Text(strName);
int scoreInt = Integer.parseInt(strScore);

// 输出姓名和成绩

context.write(name, new IntWritable(scoreInt));
}

}

}

AvgscoreReducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.matrix.avgscore;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class AvgscoreReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

// 在Reduce中,会合并相同key的value值

@Override
protected void reduce(Text key, Iterable<IntWritable> value, Context context)
throws IOException, InterruptedException {

int score = 0;
int count = 0;

Iterator<IntWritable> it = value.iterator();
while (it.hasNext()) {
// 计算总分数
score += it.next().get();
// 统计同一个人的总科目数
count++;
}

// 计算平均成绩
int avg = score / count;

context.write(key, new IntWritable(avg));

}

}

AvgscoreTest.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
	package com.matrix.avgscore;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class AvgscoreTest {

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://node1:8020");

FileSystem fs = FileSystem.get(conf);

Job job = Job.getInstance(conf);

job.setJarByClass(AvgscoreTest.class);

job.setMapperClass(AvgscoreMapper.class);
job.setReducerClass(AvgscoreReducer.class);

// 将输入的数据集分割成小数据块splites,提供一个RecordReder的实现
job.setInputFormatClass(TextInputFormat.class);

// 提供一个RecordWriter的实现,负责数据输出
job.setOutputFormatClass(TextOutputFormat.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.setInputPaths(job, "/usr/matrix/input/score");

Path outer = new Path("/usr/matrix/output/score");

if (fs.exists(outer)) {
fs.delete(outer, true);
}

FileOutputFormat.setOutputPath(job, outer);

boolean f = job.waitForCompletion(true);

if (f) {
System.out.println("程序执行成功!");
}
}
}

运行结果:(计算平均成绩)

本文作者 : Matrix
原文链接 : https://matrixsparse.github.io/2016/03/20/MR案例之实现平均成绩/
版权声明 : 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!

知识 & 情怀 | 二者兼得

微信扫一扫, 向我投食

微信扫一扫, 向我投食

支付宝扫一扫, 向我投食

支付宝扫一扫, 向我投食

留下足迹