目录
Flink vs Spark
关于Flink
Flink 的基本数据模型是数据流,及事件(Event) 的序列。数据流作为数据的基本模型可能没有表或者数据块直观熟悉,但是可以证明是完全等效的。
- 流可以是
无边界的无限流,即一般意义上的流处理。
- 也可以是
有边界的有限流,这样就是批处理。
Flink 用数据流上的变换(算子)来描述数据处理。每个算子生成一个新的数据流。在算子,DAG,和上下游算子链接(chaining) 这些方面,和 Spark 大致等价。Flink 的节点(vertex)大致相当于 Spark 的阶段(stage),划分也会和上图的 Spark DAG 基本一样。

在 DAG 的执行上,Spark 和 Flink 有一个比较显著的区别。 在 Flink 的流执行模式中,一个事件在一个节点处理完后的输出就可以发到下一个节点立即处理。这样执行引擎并不会引入额外 的延迟。与之相应的,所有节点是需要同时运行的。而 Spark 的 micro batch 和一般的 batch 执行一样,处理完上游的 stage 得到输出之后才开始下游的 stage。
在 Flink 的流执行模式中,为了提高效率也可以把多个事件放在一起传输或者计算。但这完全是执行时的优化,可以在每个算子独立决定,也不用像 RDD 等批处理模型中一样和数据集边界 绑定,可以做更加灵活的优化同时可以兼顾低延迟需求。
Flink 使用异步的 checkpoint 机制来达到任务状态的可恢复性,以保证处理的一致性,所以在处理的主流程上可以做到数据源和输出之间数据完全不用落盘,达到更高的性能和更低的延迟。

数据处理场景
除了批处理之外,Spark 还支持实时数据流处理,交互式查询,和机器学习,图计算等。
实时数据流处理和批处理主要区别就是对低延时的要求。Spark 因为 RDD 是基于内存的,可以比较容易切成较小的块来处理。如果能对这些小块处理得足够快,就能达到低延时的效果。
交互式查询场景,如果数据能全在内存,处理得足够快的话,就可以支持交互式查询。
机器学习和图计算其实是和前几种场景不同的 RDD 算子类型。Spark 提供了库来支持常用 的操作,用户或者第三方库也可以自己扩展。值得一提的是,Spark 的 RDD 模型和机器学 习模型训练的迭代计算非常契合,从一开始就在有的场景带来了非常显著的性能提升。
从这些可以看出来,比起 Hadoop MapReduce, Spark 本质上就是基于内存的更快的批处理。 然后用足够快的批处理来实现各种场景。
下载VMware Fusion
安装VMWare Fusion成功后,Mac OS会新增两张网卡vmnet1以及vmnet8,其中vmnet1是Host-only模式,vmnet8是NAT模式,这里选择vmnet8使用NAT进行网络设置。

1 2 3 4 5 6
| vmnet1: flags=8863<UP,BROADCAST,SMART,RUNNING,SIMPLEX,MULTICAST> mtu 1500 ether 00:50:56:c0:00:01 inet 192.168.246.1 netmask 0xffffff00 broadcast 192.168.246.255 vmnet8: flags=8863<UP,BROADCAST,SMART,RUNNING,SIMPLEX,MULTICAST> mtu 1500 ether 00:50:56:c0:00:08 inet 192.168.239.1 netmask 0xffffff00 broadcast 192.168.239.255
|
集群规划
- 192.168.239.180 flink01
- 192.168.239.181 flink02
- 192.168.239.182 flink03
配置网络
动态分配一个ip地址
1
| vi /etc/sysconfig/network-scripts/ifcfg-enp0s3
|
重新启动网络
查看ip地址
设置静态ip地址
1
| vi /etc/sysconfig/network-scripts/ifcfg-ens33
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| TYPE=Ethernet BOOTPROTO=static DEFROUTE=yes PEERDNS=yes PEERROUTES=yes IPV4_FAILURE_FATAL=no IPV6INIT=yes IPV6_AUTOCONF=yes IPV6_DEFROUTE=yes IPV6_PEERDNS=yes IPV6_PEERROUTES=yes IPV6_FAILURE_FATAL=no IPV6_ADDR_GEN_MODE=stable-privacy NAME=enp0s3 DEVICE=enp0s3 ONBOOT=yes IPADDR=192.168.239.180 NETMASK=255.255.255.0 GATEWAY=192.168.239.2
|
配置DNS
检查NetManager的状态
1
| systemctl status NetworkManager.service
|
检查NetManager管理的网络接口
检查NetManager管理的网络连接
设置dns
1
| nmcli con mod ens33 ipv4.dns "114.114.114.114 8.8.8.8"
|
让dns配置生效
设置静态、瞬态或灵活主机名,分别使用–static,–transient或–pretty选项
1 2 3 4 5 6 7
| [root@localhost ~] [root@localhost ~] flink01 [root@localhost ~] flink01 [root@localhost ~] flink01
|
重启CentOS 7之后,查看主机名
1 2
| [root@flink01 ~] flink01
|
三台机器都配置hosts(配置本机的hostname到ip地址的映射)
1 2 3
| 192.168.239.180 flink01 192.168.239.181 flink02 192.168.239.182 flink03
|
关闭防火墙
1 2 3
| systemctl status firewalld.service systemctl stop firewalld.service systemctl disable firewalld.service
|
关闭windows的防火墙
后面要搭建集群,有的大数据技术的集群之间,在本地你给了防火墙的话,可能会没有办法互相连接,会导致搭建失败
配置yum
1 2 3
| yum clean all yum makecache yum install wget
|
安装JDK
将jdk-8u131-linux-x64.rpm通过WinSCP上传到虚拟机中
1
| scp -C -r jdk-8u131-linux-x64.rpm root@192.168.0.180:/root
|
安装JDK
1
| rpm -ivh jdk-8u131-linux-x64.rpm
|
配置jdk相关的环境变量
1 2 3 4
| [root@flink01 ~] export JAVA_HOME=/usr/java/latest export PATH=$PATH:$JAVA_HOME/bin [root@flink01 ~]
|
测试jdk安装是否成功
配置5台CentOS为ssh免密码互相通信
在5台机器上配置对本机的ssh免密码登录
生成本机的公钥,过程中不断敲回车即可,ssh-keygen命令默认会将公钥放在/root/.ssh目录下
配置本机免密码登录
1 2
| cd /root/.ssh cp id_rsa.pub authorized_keys
|
将公钥复制为authorized_keys文件,此时使用ssh连接本机就不需要输入密码了
配置五台机器互相之间的ssh免密码登录
使用ssh-copy-id -i hostname命令将本机的公钥拷贝到指定机器的authorized_keys文件中
1 2 3 4 5 6 7 8
| [root@flink01 ~] [root@flink01 ~]
[root@flink02 ~] [root@flink02 ~]
[root@flink03 ~] [root@flink03 ~]
|
同步集群时间
使用yum安装ntpdate
设置定时同步时间
1 2
| crontab -e */10 * * * * /usr/sbin/ntpdate time.nist.gov
|
下载Flink
下载Flink安装文件
1
| wget http://apache.01link.hk/flink/flink-1.7.1/flink-1.7.1-bin-scala_2.11.tgz
|
解压压缩文件
目录重命名
编辑Flink配置文件
编辑conf/flink-conf.yaml
配置Master地址
1
| jobmanager.rpc.address: 192.168.239.180
|
编辑conf/slaves
配合slave地址
1 2
| 192.168.239.181 192.168.239.182
|
配置Flink环境变量
1 2 3 4
| vi .bashrc export FLINK_HOME=/data/flink export PATH=$PATH:$FLINK_HOME/bin source .bashrc
|
将flink目录发送到flink02、flink03机器
1 2 3 4 5
| scp -r /data/flink root@flink02:/data/flink scp -r /data/flink root@flink03:/data/flink
scp -r /root/.bashrc root@flink02:/root scp -r /root/.bashrc root@flink03:/root
|
启动Flink
启动Flink
1 2 3 4 5
| [root@flink01 ~] Starting cluster. Starting standalonesession daemon on host flink01. Starting taskexecutor daemon on host flink02. Starting taskexecutor daemon on host flink03.
|
查看Flink进程
flink01进程
1 2 3 4
| [root@flink01 flink] 2726 StandaloneSessionClusterEntrypoint 2780 Jps [root@flink01 flink]
|
flink02进程
1 2 3
| [root@flink02 ~] 2604 Jps 2558 TaskManagerRunner
|
flink03进程
1 2 3
| [root@flink03 ~] 2605 Jps 2559 TaskManagerRunner
|
查看Flink WebUI
1
| http://192.168.239.180:8081/
|

暂停Flink
1 2 3 4
| [root@flink01 ~] Stopping taskexecutor daemon (pid: 2558) on host flink02. Stopping taskexecutor daemon (pid: 2559) on host flink03. Stopping standalonesession daemon (pid: 2726) on host flink01.
|
分步启动Master、Slave进程
启动JobManager
1
| ./bin/jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all
|
启动TaskManager
1
| ./bin/taskmanager.sh start|start-foreground|stop|stop-all
|
Flink流处理 Demo
FlinkSQL Demo