Storm之WordCount&计算拓扑
Storm是什么
1 | mysql:事务性系统,面临海量数据的尴尬 |
mysql
1 | TB级、PB级海量数据,使用mysql/分布式mysql/分布式数据库处理数据比较尴尬 |
ZooKeeper
1 | 分布式系统的协调,分布式锁,分布式选举->高可用HA架构,轻量级元数据存储 |
Hive
1 | Hadoop生态栈里面,做数据仓库的一个系统,高并发访问下,海量请求日志的批量统计分析,日报周报月报,接口调用情况,业务使用情况,等等 |
Spark
1 | 离线批量数据处理,比如从DB中一次性批量处理几亿数据,清洗和处理后写入Redis中供后续的系统使用 |
HBase
- 海量数据的
在线存储和简单查询,替代MySQL分库分表,提供更好的伸缩性 - java底层,对应的是海量数据,然后要做一些简单的
存储和查询,同时数据增多的时候要快速扩容
Elasticsearch
1 | 海量数据的复杂检索以及搜索引擎的构建,支撑有大量数据的各种企业信息化系统的搜索引擎,电商/新闻等网站的搜索引擎 |
hadoop
1 | 每天将所有数据收集起来,第二天凌晨时统一进行离线批处理 |
- 分布式存储
1 | 海量数据,分布式存储,每个服务器存储一部分的数据 |
- 分布式计算
1 | 分布式计算,每台服务器上面会计算一部分的数据,最终的计算结果,会汇总起来 |
Storm特点
支撑各种实时类的项目场景
- 实时处理消息以及更新数据库,基于最基础的实时计算语义和API(实时数据处理领域)
- 对
实时的数据流持续的进行查询或计算,同时将最新的计算结果持续的推送给客户端展示,同样基于最基础的实时计算语义和API(实时数据分析领域) - 对
耗时的查询进行并行化,基于DRPC,即分布式RPC调用,单表30天数据,并行化,每个进程查询一天数据,最后组装结果
高度的
可伸缩性
- 如果要扩容,直接加机器,调整storm计算作业的并行度就可以了,storm会自动部署更多的进程和线程到其他的机器上去,无缝快速扩容
数据不丢失的保证
- storm的消息可靠机制开启后,可以保证一条数据都不丢
超强的
健壮性
- 元数据全部放zookeeper,不在内存中,随便挂都不要紧
1 | storm比hadoop、spark等大数据类系统,健壮的多的多 |
使用的
便捷性
1 | 核心语义非常的简单,开发起来效率很高 |
Storm集群架构
1 | Nimbus、Supervisor、Zookeeper、Worker、Executor、Task |
- Nimbus:Storm集群架构中的主节点,负责元数据的维护、资源调度,提交实时计算作业的入口
Storm核心概念
1 | Topology、Spout、Bolt、Tuple、Stream |
- Topology:由一堆spout+bolt组成[实时计算作业],一个拓扑涵盖数据源获取/生产+数据处理的所有的代码逻辑
- Spout:数据源的一个代码组件,就是我们可以实现一个spout接口,写一个java类,在这个spout代码中,我们可以自己尝试去
数据源获取数据,比如说从kafka中消费数据 - bolt:一个业务处理的代码组件,spout会将数据传送给bolt,各种bolt还可以串联成一个
计算链条,java类实现了一个bolt接口 - tuple:就是一条数据,每条数据都会被封装在tuple中,在多个spout和bolt之间传递
- stream:就是一个流,抽象的概念,源源不断过来的tuple,就组成了一条数据流
Storm的并行度以及流分组
- 并行度:Worker->Executor->Task,即
Task - 流分组:Task与Task之间的
数据流向关系
1 | 在默认情况下,一个Executor只有一个task,Executor的数量跟task是相等的 |
1 | Shuffle Grouping:随机发射,负载均衡 |
1 | 发射的每条数据是一个tuple,每个tuple中有多个field作为字段 |
Storm分布式集群规划
1 | 192.168.31.231 matrix-cache01 |
Strom分布式集群部署
搭建ZK集群
安装Java JDK
解压缩,重命名,配置环境变量
1 | [root@matrix-cache01 ~]# tar -zxvf apache-storm-1.1.0.tar.gz -C /usr/local |
1 | [root@matrix-cache01 ~]# vi .bashrc |
修改storm配置文件
1 | [root@matrix-cache01 ~]# mkdir /var/storm |
1 | # storm.zookeeper.servers,表示Zookeeper集群地址 |
将storm文件目录/环境变量发送到另外两台机器上
1 | [root@matrix-cache01 ~]# scp -r /usr/local/storm/ root@192.168.31.232:/usr/local/storm |
启动storm集群和ui界面
1 | 一个节点,storm nimbus >/dev/null 2>&1 & |
1 | [root@matrix-cache01 ~]# storm nimbus >/dev/null 2>&1 & |
访问ui界面,8080端口
1 | http://192.168.31.233:8080/index.html |

Wordcount实现
1 | storm源源不断的接收到一些句子,实时的统计出句子中每个单词的出现次数 |




项目结构

搭建工程环境
1 | <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
编写代码
1 | package com.matrix.storm; |

基于集群计算拓扑
修改依赖
1 | <dependency> |
将Eclipse工程打包




在工程下查看Jar包

清空jar包

提交作业到storm集群
1 | storm jar storm-wordcount-0.0.1-SNAPSHOT.jar com.matrix.storm.WordCountTopology WordCountTopology |


在storm ui上观察storm作业的运行




kill掉storm作业
1 | storm kill WordCountTopology |


本文作者 : Matrix
原文链接 : https://matrixsparse.github.io/2017/07/18/Storm之WordCount&计算拓扑/
版权声明 : 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!
知识 & 情怀 | 二者兼得