社交粉丝数据分析

发布 : 2016-03-27 分类 : 大数据 浏览 :
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
以下是qq的好友列表数据,冒号前是一个用,冒号后是该用户的所有好友(数据中的好友关系是单向的)

A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J

求出哪些人两两之间有共同好友,及他俩的共同好友都有谁?
解决思路:
第一步
map
读一行 A:B,C,D,F,E,O
输出<B,A><C,A><D,A><F,A><E,A><O,A>
在读一行 B:A,C,E,K
输出 <A,B><C,B><E,B><K,B>


REDUCE
拿到的数据比如<C,A><C,B><C,E><C,F><C,G>......
输出:
<A-B,C>
<A-E,C>
<A-F,C>
<A-G,C>
<B-E,C>
<B-F,C>.....

第二步
map
读入一行<A-B,C>
直接输出<A-B,C>

reduce
读入数据 <A-B,C><A-B,F><A-B,G>.......
输出: A-B C,F,G,.....

数据准备

Markdown

第一个MR程序

SharedFriendsStepOneMapper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.matrix.sharedFriend;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SharedFriendsStepOneMapper extends Mapper<LongWritable, Text, Text, Text> {

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

// A:B,C,D,F,E,O
String line = value.toString();
String[] person_friends = line.split(":");
String person = person_friends[0];// A
String friends = person_friends[1];// B,C,D,F,E,O

for(String friend : friends.split(",")){
// 输出<好友,人>
context.write(new Text(friend),new Text(person));
}
}
}

SharedFriendsStepOneReducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.matrix.sharedFriend;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SharedFriendsStepOneReducer extends Reducer<Text, Text, Text, Text> {

@Override
protected void reduce(Text friend, Iterable<Text> persons, Context context)
throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer();
for(Text person : persons){
sb.append(person).append(",");
}
context.write(friend, new Text(sb.toString()));
}
}

SharedFriendsStepOneRunner.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.matrix.sharedFriend;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SharedFriendsStepOneRunner {

// 把业务逻辑相关的信息(哪个是mapper,哪个是reducer,要处理的数据在哪里,输出的结果放哪里.....)
// 描述成一个job对象
// 把这个描述好的job提交给集群去运行
public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.230.10:8020");
conf.set("yarn.resourcemanager.hostname", "192.168.230.13");

FileSystem fs = FileSystem.get(//
new URI("hdfs://192.168.230.10:8020"), //
conf, //
"root"//
);

Path outdir = new Path("/usr/matrix/output/share");
if (fs.exists(outdir)) {
fs.delete(outdir, true);
}

Job job = Job.getInstance(conf);
// 设置用户名称
job.setJobName("share");

// 指定我这个job所在jar包
job.setJarByClass(SharedFriendsStepOneRunner.class);

job.setMapperClass(SharedFriendsStepOneMapper.class);
job.setReducerClass(SharedFriendsStepOneReducer.class);

// 设置我们的业务逻辑Mapper类的输出key和value的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 设置我们的业务逻辑Reducer类的输出key和value的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

// 指定要处理的数据所在的位置
// FileInputFormat.setInputPaths(job, "/usr/matrix/input/");
FileInputFormat.addInputPath(job, new Path("/usr/matrix/input/share"));
// 指定处理完成之后的结果所保存的位置
FileOutputFormat.setOutputPath(job, outdir);

// 向yarn集群提交这个job
boolean res = job.waitForCompletion(true);
if (res) {
System.out.println("程序运行成功!");
}

}
}

运行结果

Markdown

第二个MR程序

SharedFriendsStepTwoMapper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.matrix.sharedFriend;

import java.io.IOException;
import java.util.Arrays;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SharedFriendsStepTwoMapper extends Mapper<LongWritable, Text, Text, Text> {

// 拿到的数据是上一个步骤的输出结果
// A I,K,C,B,G,F,H,O,D,
// 友 人,人,人

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

String line = value.toString();
String[] friend_persons = line.split("\t");

String friend = friend_persons[0];// A
String[] persons = friend_persons[1].split(",");// I,K,C,B,G,F,H,O,D,

Arrays.sort(persons);

for(int i=0;i<persons.length-1;i++){
for(int j=i+1;j<persons.length;j++){
// 发出<人-人,好友>,这样,相同的"人-人"对的所有好友就会到同1个reduce中去
context.write(new Text(persons[i]+"-"+persons[j]), new Text(friend));
}
}
}
}

SharedFriendsStepTwoReducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.matrix.sharedFriend;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SharedFriendsStepTwoReducer extends Reducer<Text, Text, Text, Text> {

@Override
protected void reduce(Text person_person, Iterable<Text> friends, Context context)
throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer();

for(Text friend : friends){
sb.append(friend).append(" ");
}

context.write(person_person, new Text(sb.toString()));
}

}

SharedFriendsStepTwoRunner.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.matrix.sharedFriend;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SharedFriendsStepTwoRunner {

// 把业务逻辑相关的信息(哪个是mapper,哪个是reducer,要处理的数据在哪里,输出的结果放哪里.....)
// 描述成一个job对象
// 把这个描述好的job提交给集群去运行
public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.230.10:8020");
conf.set("yarn.resourcemanager.hostname", "192.168.230.13");

FileSystem fs = FileSystem.get(//
new URI("hdfs://192.168.230.10:8020"), //
conf, //
"root"//
);

Path outdir = new Path("/usr/matrix/output/share2");
if (fs.exists(outdir)) {
fs.delete(outdir, true);
}

Job job = Job.getInstance(conf);
// 设置用户名称
job.setJobName("share");

// 指定我这个job所在jar包
job.setJarByClass(SharedFriendsStepTwoRunner.class);

job.setMapperClass(SharedFriendsStepTwoMapper.class);
job.setReducerClass(SharedFriendsStepTwoReducer.class);

// 设置我们的业务逻辑Mapper类的输出key和value的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 设置我们的业务逻辑Reducer类的输出key和value的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

// 指定要处理的数据所在的位置
// FileInputFormat.setInputPaths(job, "/usr/matrix/input/");
FileInputFormat.addInputPath(job, new Path("/usr/matrix/output/share"));
// 指定处理完成之后的结果所保存的位置
FileOutputFormat.setOutputPath(job, outdir);

// 向yarn集群提交这个job
boolean res = job.waitForCompletion(true);
if (res) {
System.out.println("程序运行成功!");
}

}
}

Markdown

本文作者 : Matrix
原文链接 : https://matrixsparse.github.io/2016/03/27/社交粉丝数据分析/
版权声明 : 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!

知识 & 情怀 | 二者兼得

微信扫一扫, 向我投食

微信扫一扫, 向我投食

支付宝扫一扫, 向我投食

支付宝扫一扫, 向我投食

留下足迹