1.集群部署的基本流程
1 2 3
| 集群部署的流程:下载安装包、解压安装包、修改配置文件、分发安装包、启动集群
192.168.230.11(node02)、192.168.230.12(node03)、192.168.230.13(node04)
|
2.Storm集群部署
1 2 3 4 5
| 部署依赖环境
Java 6+ Python 2.6.6+
|
2.1.下载安装包
2.2.解压安装包
1 2
| [root@node02 software] [root@node02 modules]
|
2.3.修改配置文件
1 2 3 4 5 6 7
| [root@node02 modules] [root@node02 conf] total 8 -rw-r--r--. 1 502 games 1128 Oct 24 2015 storm_env.ini -rw-r--r--. 1 502 games 1613 Oct 24 2015 storm.yaml [root@node02 conf] [root@node02 conf]
|
1 2 3 4 5 6
| storm.zookeeper.servers: - "node02" - "node03" - "node04"
nimbus.host: "node02"
|
2.4.分发安装包
1 2
| [root@node02 ~] [root@node02 ~]
|
2.5.启动集群
2.5.1.启动Zookeeper
1 2 3
| [root@node02 ~] [root@node03 ~] [root@node04 ~]
|
2.5.2.启动niumbus
1 2 3
| [root@node02 ~] [root@node02 storm-0.10.0] [root@node02 storm-0.10.0]
|

2.5.3.启动Storm Web UI
1
| [root@node02 storm-0.10.0]
|
2.5.4.启动supervisor
1 2
| [root@node02 storm-0.10.0] [root@node02 storm-0.10.0]
|

1 2 3 4
| [root@node03 ~] [root@node03 storm-0.10.0] [root@node04 ~] [root@node04 storm-0.10.0]
|
2.6.查看集群
访问nimbus.host:/8080,即可看到storm的ui界面。
在浏览器地址栏中输入:http://192.168.230.11:8080/index.html

2.7.Storm单词技术案例(重点掌握)
2.7.1.功能说明
设计一个topology,来实现对文档里面的单词出现的频率进行统计。
整个topology分为三个部分:
RandomSentenceSpout:数据源,在已知的英文句子中,随机发送一条句子出去。
SplitSentenceBolt:负责将单行文本记录(句子)切分成单词
WordCountBolt:负责对单词的频率进行累加
2.7.1.1.WordCountTopologMain.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package com.matrix.storm;
import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import clojure.main;
public class WordCountTopologMain {
public static void main(String[] args){ TopologyBuilder topologyBuilder = new TopologyBuilder(); topologyBuilder.setSpout("mySpout",new MySpout(),2); topologyBuilder.setBolt("mybolt1",new MySplitBolt(),2).shuffleGrouping("mySpout"); topologyBuilder.setBolt("mybolt2",new MyCountBolt(),4).fieldsGrouping("mybolt1", new Fields("word"));
Config config = new Config(); config.setNumWorkers(2);
LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mywordcount", config, topologyBuilder.createTopology()); }
}
|
2.7.1.2.MySpout.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| package com.matrix.storm;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values;
public class MySpout extends BaseRichSpout{
SpoutOutputCollector collector;
@Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; }
public void nextTuple(){ collector.emit(new Values("i am lilei love hanmeimei")); }
public void declareOutputFields(OutputFieldsDeclarer declarer){ declarer.declare(new Fields("love")); }
}
|
2.7.1.3.MySplitBolt.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| package com.matrix.storm;
import java.util.Map;
import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values;
public class MySplitBolt extends BaseRichBolt{
OutputCollector collector;
@Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; }
@Override public void execute(Tuple input) { String line = input.getString(0); String[] arrWords = line.split(" "); for(String word:arrWords){ collector.emit(new Values(word,1)); } }
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word","num")); } }
|
2.7.1.4.MyCountBolt.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| package com.matrix.storm;
import java.util.HashMap; import java.util.Map;
import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple;
public class MyCountBolt extends BaseRichBolt {
OutputCollector collector; Map<String, Integer> map = new HashMap<String, Integer>();
@Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; }
@Override public void execute(Tuple input) { String word = input.getString(0); Integer num = input.getInteger(1); System.out.println(Thread.currentThread().getId() + " word:" + word); if (map.containsKey(word)) { Integer count = map.get(word); map.put(word, count + num); } else { map.put(word, num); } }
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
|
2.7.2.Stream Grouping详解
1 2 3 4 5 6 7
| (1)ShuffleGrouping:随机分组,随机分发Stream中的tuple,保证每个Bolt的Task接收Tuple数量大致一致; (2)FieldsGrouping:按照字段分组,保证相同字段的Tuple分配到同一个Task中; (3)AllGrouping:广播发送,每一个Task都会受到所有的Tuple; (4)GlobalGrouping:全局分组,所有的Tuple都发送到同一个Task中,此时一般将当前Component的并发数目设置为1; (5)NonGrouping:不分组,和ShuffleGrouping类似,当前Task的执行会和它的被订阅者在同一个线程中执行; (6)DirectGrouping:直接分组,直接指定由某个Task来执行Tuple的处理,而且,此时必须有emitDirect方法来发送; (7)localOrShuffleGrouping:和ShuffleGrouping类似,若Bolt有多个Task在同一个进程中,Tuple会随机发给这些Task。
|