Strom集群部署的基本流程

发布 : 2016-02-08 分类 : 大数据 浏览 :

1.集群部署的基本流程

1
2
3
集群部署的流程:下载安装包、解压安装包、修改配置文件、分发安装包、启动集群

192.168.230.11(node02)、192.168.230.12(node03)、192.168.230.13(node04)

2.Storm集群部署

1
2
3
4
5

部署依赖环境

Java 6+
Python 2.6.6+

2.1.下载安装包

2.2.解压安装包

1
2
[root@node02 software]# tar -zxvf apache-storm-0.10.0.tar.gz -C /opt/modules
[root@node02 modules]# mv apache-storm-0.10.0 storm-0.10.0

2.3.修改配置文件

1
2
3
4
5
6
7
[root@node02 modules]# cd storm-0.10.0/conf/
[root@node02 conf]# ll
total 8
-rw-r--r--. 1 502 games 1128 Oct 24 2015 storm_env.ini
-rw-r--r--. 1 502 games 1613 Oct 24 2015 storm.yaml
[root@node02 conf]# cp storm.yaml storm.yaml.bak
[root@node02 conf]# vi storm.yaml
1
2
3
4
5
6
storm.zookeeper.servers:
- "node02"
- "node03"
- "node04"

nimbus.host: "node02"

2.4.分发安装包

1
2
[root@node02 ~]# scp -r /opt/modules/storm-0.10.0/ root@node03:/opt/modules/storm-0.10.0
[root@node02 ~]# scp -r /opt/modules/storm-0.10.0/ root@node04:/opt/modules/storm-0.10.0

2.5.启动集群

2.5.1.启动Zookeeper

1
2
3
[root@node02 ~]# /opt/modules/zookeeper/bin/zkServer.sh start
[root@node03 ~]# /opt/modules/zookeeper/bin/zkServer.sh start
[root@node04 ~]# /opt/modules/zookeeper/bin/zkServer.sh start

2.5.2.启动niumbus

1
2
3
[root@node02 ~]# cd /opt/modules/storm-0.10.0/
[root@node02 storm-0.10.0]# ./bin/storm nimbus >> logs/nimbus.out 2>&1 &
[root@node02 storm-0.10.0]# tail -f logs/nimbus.log

Markdown

2.5.3.启动Storm Web UI

1
[root@node02 storm-0.10.0]# ./bin/storm ui >> logs/ui.out 2>&1 &

2.5.4.启动supervisor

1
2
[root@node02 storm-0.10.0]# ./bin/storm supervisor >> logs/supervisor.out 2>&1 &
[root@node02 storm-0.10.0]# tail -f logs/supervisor.log

Markdown

1
2
3
4
[root@node03 ~]# cd /opt/modules/storm-0.10.0/
[root@node03 storm-0.10.0]# ./bin/storm supervisor >> logs/supervisor.out 2>&1 &
[root@node04 ~]# cd /opt/modules/storm-0.10.0/
[root@node04 storm-0.10.0]# ./bin/storm supervisor >> logs/supervisor.out 2>&1 &

2.6.查看集群

访问nimbus.host:/8080,即可看到storm的ui界面。

在浏览器地址栏中输入:http://192.168.230.11:8080/index.html

Markdown

2.7.Storm单词技术案例(重点掌握)

2.7.1.功能说明

设计一个topology,来实现对文档里面的单词出现的频率进行统计。
整个topology分为三个部分:
    RandomSentenceSpout:数据源,在已知的英文句子中,随机发送一条句子出去。
    SplitSentenceBolt:负责将单行文本记录(句子)切分成单词
    WordCountBolt:负责对单词的频率进行累加
2.7.1.1.WordCountTopologMain.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.matrix.storm;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import clojure.main;

public class WordCountTopologMain {

public static void main(String[] args){
// 1.准备一个TopologyBuilder
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("mySpout",new MySpout(),2);
topologyBuilder.setBolt("mybolt1",new MySplitBolt(),2).shuffleGrouping("mySpout");
topologyBuilder.setBolt("mybolt2",new MyCountBolt(),4).fieldsGrouping("mybolt1", new Fields("word"));

// 2.创建一个configuration,用来指定当前topology需要的worker的数量
Config config = new Config();
config.setNumWorkers(2);

// 3.提交任务 --两种模式 本地模式和集群模式
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("mywordcount", config, topologyBuilder.createTopology());
}

}
2.7.1.2.MySpout.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.matrix.storm;

import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class MySpout extends BaseRichSpout{

SpoutOutputCollector collector;

// 初始化方法
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}

// storm框架在while(true) 调用nextTuple方法
public void nextTuple(){
collector.emit(new Values("i am lilei love hanmeimei"));
}

public void declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(new Fields("love"));
}

}
2.7.1.3.MySplitBolt.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.matrix.storm;

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class MySplitBolt extends BaseRichBolt{

OutputCollector collector;

@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}

// 被storm框架 while(true) 循环调用 传入参数tuple
@Override
public void execute(Tuple input) {
String line = input.getString(0);
String[] arrWords = line.split(" ");
for(String word:arrWords){
collector.emit(new Values(word,1));
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","num"));
}
}
2.7.1.4.MyCountBolt.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.matrix.storm;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

public class MyCountBolt extends BaseRichBolt {

OutputCollector collector;
Map<String, Integer> map = new HashMap<String, Integer>();

// 此方法和Spout中的open方法类似,为Bolt提供了OutputCollector,用来从Bolt中发送Tuple。执行在execute方法之前
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}

// 这是Bolt中最关键的一个方法,对于Tuple的处理都可以放到此方法中进行。具体的发送也是通过emit方法来完成的。此时,有两种情况,一种是emit方法中有两个参数,另一个种是有一个参数。
// (1)emit有一个参数:此唯一的参数是发送到下游Bolt的Tuple,此时,由上游发来的旧的Tuple在此隔断,新的Tuple和旧的Tuple不再属于同一棵Tuple树。新的Tuple另起一个新的Tuple树。
// (2)emit有两个参数:第一个参数是旧的Tuple的输入流,第二个参数是发往下游Bolt的新的Tuple流。此时,新的Tuple和旧的Tuple是仍然属于同一棵Tuple树,即,如果下游的Bolt处理Tuple失败,则会向上传递到当前Bolt,当前Bolt根据旧的Tuple流继续往上游传递,申请重发失败的Tuple。保证Tuple处理的可靠性。

@Override
public void execute(Tuple input) {
String word = input.getString(0);
Integer num = input.getInteger(1);
System.out.println(Thread.currentThread().getId() + " word:" + word);
if (map.containsKey(word)) {
Integer count = map.get(word);
map.put(word, count + num);
} else {
map.put(word, num);
}
}

// 用于声明当前Bolt发送的Tuple中包含的字段
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// 不输出
}
}

2.7.2.Stream Grouping详解

1
2
3
4
5
6
7
(1)ShuffleGrouping:随机分组,随机分发Stream中的tuple,保证每个Bolt的Task接收Tuple数量大致一致;
(2)FieldsGrouping:按照字段分组,保证相同字段的Tuple分配到同一个Task中;
(3)AllGrouping:广播发送,每一个Task都会受到所有的Tuple;
(4)GlobalGrouping:全局分组,所有的Tuple都发送到同一个Task中,此时一般将当前Component的并发数目设置为1
(5)NonGrouping:不分组,和ShuffleGrouping类似,当前Task的执行会和它的被订阅者在同一个线程中执行;
(6)DirectGrouping:直接分组,直接指定由某个Task来执行Tuple的处理,而且,此时必须有emitDirect方法来发送;
(7)localOrShuffleGrouping:和ShuffleGrouping类似,若Bolt有多个Task在同一个进程中,Tuple会随机发给这些Task。
本文作者 : Matrix
原文链接 : https://matrixsparse.github.io/2016/02/08/Strom分布式集群部署/
版权声明 : 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!

知识 & 情怀 | 二者兼得

微信扫一扫, 向我投食

微信扫一扫, 向我投食

支付宝扫一扫, 向我投食

支付宝扫一扫, 向我投食

留下足迹