Storm之WordCount&计算拓扑

发布 : 2017-07-18 分类 : 大数据 浏览 :

Storm是什么

1
2
3
mysql:事务性系统,面临海量数据的尴尬
hadoop:离线批处理
storm:实时计算,实时缓存热点数据统计->缓存预热->缓存热点数据自动降级

mysql

1
2
3
TB级、PB级海量数据,使用mysql/分布式mysql/分布式数据库处理数据比较尴尬
mysql分库分表就不太合适了,mysql分库分表扩容,还是比较麻烦的
mysql的like "%xxxx%",更加合适一些,性能更加好

ZooKeeper

1
2
3
4
分布式系统的协调,分布式锁,分布式选举->高可用HA架构,轻量级元数据存储
用java开发了分布式的系统架构,将整套系统拆分成多个部分,每个部分都会负责一些功能,互相之间需要交互和协调
服务A说,我在处理某件事情的时候,服务B你就别处理了
服务A说,我一旦发生了某些状况,希望服务B你立即感知到,然后做出相应的对策

Hive

1
Hadoop生态栈里面,做数据仓库的一个系统,高并发访问下,海量请求日志的批量统计分析,日报周报月报,接口调用情况,业务使用情况,等等

Spark

1
离线批量数据处理,比如从DB中一次性批量处理几亿数据,清洗和处理后写入Redis中供后续的系统使用

HBase

  • 海量数据的在线存储简单查询,替代MySQL分库分表,提供更好的伸缩性
  • java底层,对应的是海量数据,然后要做一些简单的存储查询,同时数据增多的时候要快速扩容

Elasticsearch

1
海量数据的复杂检索以及搜索引擎的构建,支撑有大量数据的各种企业信息化系统的搜索引擎,电商/新闻等网站的搜索引擎

hadoop

1
每天将所有数据收集起来,第二天凌晨时统一进行离线批处理
  • 分布式存储
1
海量数据,分布式存储,每个服务器存储一部分的数据
  • 分布式计算
1
分布式计算,每台服务器上面会计算一部分的数据,最终的计算结果,会汇总起来

Storm特点

支撑各种实时类的项目场景

  • 实时处理消息以及更新数据库,基于最基础的实时计算语义和API(实时数据处理领域)
  • 实时数据流持续的进行查询或计算,同时将最新的计算结果持续的推送给客户端展示,同样基于最基础的实时计算语义和API(实时数据分析领域)
  • 耗时的查询进行并行化,基于DRPC,即分布式RPC调用,单表30天数据,并行化,每个进程查询一天数据,最后组装结果

高度的可伸缩性

  • 如果要扩容,直接加机器,调整storm计算作业的并行度就可以了,storm会自动部署更多的进程和线程到其他的机器上去,无缝快速扩容

数据不丢失的保证

  • storm的消息可靠机制开启后,可以保证一条数据都不丢

超强的健壮性

  • 元数据全部放zookeeper,不在内存中,随便挂都不要紧
1
storm比hadoop、spark等大数据类系统,健壮的多的多

使用的便捷性

1
核心语义非常的简单,开发起来效率很高

Storm集群架构

1
Nimbus、Supervisor、Zookeeper、Worker、Executor、Task
  • Nimbus:Storm集群架构中的主节点,负责元数据的维护、资源调度,提交实时计算作业的入口

Storm核心概念

1
Topology、Spout、Bolt、Tuple、Stream
  • Topology:由一堆spout+bolt组成[实时计算作业],一个拓扑涵盖数据源获取/生产+数据处理的所有的代码逻辑
  • Spout:数据源的一个代码组件,就是我们可以实现一个spout接口,写一个java类,在这个spout代码中,我们可以自己尝试去数据源获取数据,比如说从kafka中消费数据
  • bolt:一个业务处理的代码组件,spout会将数据传送给bolt,各种bolt还可以串联成一个计算链条,java类实现了一个bolt接口
  • tuple:就是一条数据,每条数据都会被封装在tuple中,在多个spout和bolt之间传递
  • stream:就是一个流,抽象的概念,源源不断过来的tuple,就组成了一条数据流

Storm的并行度以及流分组

  • 并行度:Worker->Executor->Task,即Task
  • 流分组:Task与Task之间的数据流向关系
1
在默认情况下,一个Executor只有一个task,Executor的数量跟task是相等的
1
2
Shuffle Grouping:随机发射,负载均衡
Fields Grouping:根据某一个,或者某些个,fields,进行分组,那一个或者多个fields如果值完全相同的话,那么这些tuple,就会发送给下游bolt的其中固定的一个task
1
发射的每条数据是一个tuple,每个tuple中有多个field作为字段

Storm分布式集群规划

1
2
3
192.168.31.231 matrix-cache01
192.168.31.232 matrix-cache02
192.168.31.233 matrix-cache03

Strom分布式集群部署

搭建ZK集群

安装Java JDK

解压缩,重命名,配置环境变量

1
2
3
[root@matrix-cache01 ~]# tar -zxvf apache-storm-1.1.0.tar.gz -C /usr/local
[root@matrix-cache01 ~]# cd /usr/local/
[root@matrix-cache01 local]# mv apache-storm-1.1.0 storm
1
2
3
4
[root@matrix-cache01 ~]# vi .bashrc
export STORM_HOME=/usr/local/storm
export PATH=$PATH:$STORM_HOME/bin
[root@matrix-cache01 ~]# source .bashrc

修改storm配置文件

1
2
[root@matrix-cache01 ~]# mkdir /var/storm
[root@matrix-cache01 ~]# vi $STORM_HOME/conf/storm.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# storm.zookeeper.servers,表示Zookeeper集群地址
# 如果Zookeeper集群使用的不是默认端口,那么还需要配置storm.zookeeper.port
storm.zookeeper.servers:
- "matrix-cache01"
- "matrix-cache02"
- "matrix-cache03"

# storm.local.dir用于配置Storm存储少量文件的路径
storm.local.dir: "/var/storm"

# slots.ports,指定每个机器上可以启动多少个worker,一个端口号代表一个worker
supervisor.slots.ports:
- 6700
- 6701

# nimbus.seeds用于配置主控节点的地址,可以配置多个
# 增加了对nimbus节点的多节点配置,解决了nimbus单节点的弊端、
nimbus.seeds: ["matrix-cache01","matrix-cache02"]

将storm文件目录/环境变量发送到另外两台机器上

1
2
3
4
[root@matrix-cache01 ~]# scp -r /usr/local/storm/ root@192.168.31.232:/usr/local/storm
[root@matrix-cache01 ~]# scp -r /usr/local/storm/ root@192.168.31.233:/usr/local/storm
[root@matrix-cache01 ~]# scp -r .bashrc root@192.168.31.232:/root
[root@matrix-cache01 ~]# scp -r .bashrc root@192.168.31.233:/root

启动storm集群和ui界面

1
2
3
4
5
一个节点,storm nimbus >/dev/null 2>&1 &
三个节点,storm supervisor >/dev/null 2>&1 &
三个节点,storm ui >/dev/null 2>&1 &
在supervisor节点上,启动logviewer,就可以看到日志了
storm logviewer >/dev/null 2>&1 &
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
[root@matrix-cache01 ~]# storm nimbus >/dev/null 2>&1 &
[1] 982
[root@matrix-cache01 ~]# storm supervisor >/dev/null 2>&1 &
[2] 1068
[root@matrix-cache01 ~]# storm ui >/dev/null 2>&1 &
[3] 1631
[root@matrix-cache01 ~]# storm logviewer >/dev/null 2>&1 &
[4] 1716

[root@matrix-cache02 ~]# storm supervisor >/dev/null 2>&1 &
[1] 945
[root@matrix-cache02 ~]# storm ui >/dev/null 2>&1 &
[2] 1142
[root@matrix-cache02 ~]# storm logviewer >/dev/null 2>&1 &
[3] 1227

[root@matrix-cache03 ~]# storm supervisor >/dev/null 2>&1 &
[1] 950
[root@matrix-cache03 ~]# storm ui >/dev/null 2>&1 &
[2] 1203
[root@matrix-cache03 ~]# storm logviewer >/dev/null 2>&1 &
[3] 1242

访问ui界面,8080端口

1
http://192.168.31.233:8080/index.html

Wordcount实现

1
storm源源不断的接收到一些句子,实时的统计出句子中每个单词的出现次数

项目结构

搭建工程环境

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.matrix.storm</groupId>
<artifactId>storm-wordcount</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>storm-wordcount</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.6</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>

<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>test/main/java</testSourceDirectory>

<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.sf</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.dsa</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>META-INF/*.rsa</exclude>
<exclude>META-INF/*.EC</exclude>
<exclude>META-INF/*.ec</exclude>
<exclude>META-INF/MSFTSIG.SF</exclude>
<exclude>META-INF/MSFTSIG.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<classpathScope>compile</classpathScope>
<mainClass>com.matrix.storm.WordCountTopology</mainClass>
</configuration>
</plugin>
</plugins>
</build>
</project>

编写代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
package com.matrix.storm;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* 单词计数的拓扑
*
* @author matrix
*
*/
public class WordCountTopology {

/**
* spout,继承一个基类,实现接口,这个里面主要是负责从数据源获取数据
*
* 这个Demo就不从外部获取数据了,在内部不断发射些句子
*
* @author matrix
*
*/
public static class RandomSentenceSpout extends BaseRichSpout {

private static final long serialVersionUID = 3766116289441282097L;

private static final Logger LOGGER = LoggerFactory.getLogger(RandomSentenceSpout.class);

private SpoutOutputCollector collector;

private Random random;

/**
* open方法,对Spout进行初始化
*
* 创建一个线程池或者创建一个数据库连接池,或者构造一个httpClient
*/
@SuppressWarnings("rawtypes")
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
// 在初始化的时候,会传入进来一个东西,叫做SpoutOutputCollector
// 这个SpoutOutputCollector就是用来发射数据出去的
this.collector = collector;
// 构造一个随机数生产对象
this.random = new Random();

}

/**
* nextTuple方法
*
* 这个Spout类,最终会运行在task中,某个worker进程的某个executor线程内部的某个task中
* 那个task会负责去不断的无限循环调用nextTuple()方法 无限循环调用,可以不断发射最新的数据出去,形成一个数据流
*/
public void nextTuple() {
Utils.sleep(100);
String[] sentences = new String[] { "the cow jumped over the moon", "an apple a day keeps the doctor away",
"four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
// 随机拿句子
String sentence = sentences[random.nextInt(sentences.length)];
LOGGER.info("【发射句子】sentence=" + sentence);
// 这个values,可以认为就是构建一个tuple
// tuple是最小的数据单位,无限个tuple组成的流就是一个stream
collector.emit(new Values(sentence));
}

/**
* 定义发射出去的tuple,每个field的名称
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}

/**
* bolt,直接继承一个BaseRichBolt积累
*
* 实现里面的所有方法,每个Bolt,同样是发送到Worker某个Executor的task里面去运行
*
* @author matrix
*
*/
public static class SplitSentence extends BaseRichBolt {

private static final long serialVersionUID = 7909901426602710334L;

private OutputCollector collector;

/**
*
* 对于bolt来说,prepare方法
*
* OutputCollector,这个也是Bolt的这个tuple发射器
*/
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}

/**
* execute方法
*
* 每次接收到一条数据后,就会交给这个executor方法来执行
*
*/
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.split(" ");
for (String word : words) {
collector.emit(new Values(word));
}
}

/**
* 定义发射出去的tuple,每个field的名称
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

public static class WordCount extends BaseRichBolt {

private static final long serialVersionUID = 5809317408276439826L;

private static final Logger LOGGER = LoggerFactory.getLogger(WordCount.class);

private OutputCollector collector;
private Map<String, Long> wordCounts = new HashMap<String, Long>();

public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}

public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");

Long count = wordCounts.get(word);
if (count == null) {
count = 0L;
}
count++;

wordCounts.put(word, count);
LOGGER.info("【单词计数】" + word + "出现的次数是" + count);
collector.emit(new Values(word, count));
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}

public static void main(String[] args) {
// 在main方法中,会去将spout和bolts组合起来,构建成一个拓扑
TopologyBuilder builder = new TopologyBuilder();

// 第一个参数,给这个spout设置一个名字
// 第二个参数,创建一个spout对象
// 第三个参数,设置spout的executor有几个
builder.setSpout("RandomSentence", new RandomSentenceSpout(), 2);
builder.setBolt("SplitSentence", new SplitSentence(), 5).setNumTasks(10).shuffleGrouping("RandomSentence");
// 这个很重要,相同的单词,从SplitSentence发射出来时,一定会进入到下游指定的同一个task中
// 只有这样子,才能准确的统计出每个单词的数量
// 比如你有个单词,hello,下游task1接收到3个hello,task2接收到2个hello
// 5个hello,全都进入一个task
builder.setBolt("WordCount", new WordCount(), 10).setNumTasks(20).fieldsGrouping("SplitSentence",
new Fields("word"));

Config config = new Config();

// 说明是在命令行执行,打算提交到storm集群上去
if (args != null && args.length > 0) {
config.setNumWorkers(3);
try {
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
} else {
// 说明是在eclipse里面本地运行
config.setMaxTaskParallelism(20);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("WordCountTopology", config, builder.createTopology());

Utils.sleep(60000);

cluster.shutdown();
}
}
}

基于集群计算拓扑

修改依赖

1
2
3
4
5
6
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
<scope>provided</scope>
</dependency>

将Eclipse工程打包

在工程下查看Jar包

清空jar包

提交作业到storm集群

1
storm jar storm-wordcount-0.0.1-SNAPSHOT.jar com.matrix.storm.WordCountTopology WordCountTopology

在storm ui上观察storm作业的运行

kill掉storm作业

1
storm kill WordCountTopology

本文作者 : Matrix
原文链接 : https://matrixsparse.github.io/2017/07/18/Storm之WordCount&计算拓扑/
版权声明 : 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!

知识 & 情怀 | 二者兼得

微信扫一扫, 向我投食

微信扫一扫, 向我投食

支付宝扫一扫, 向我投食

支付宝扫一扫, 向我投食

留下足迹