Kafka开源消息系统分布式集群搭建
1.KafKa是什么
1 | 在流式计算中,KafKa一般用来缓存数据,Storm通过消费KafKa的数据进行计算 |
2.JMS是什么
2.1.JMS的基础
1 | JMS是什么:JMS是Java提供的一套技术规范 |

1 | jdk.kafka.activemq...... |
2.2.JMS消息传输模型
1 | 点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除) |

1 | queue.put(object) 数据生产 |
2.3.JMS核心组件
1 | Destination:消息发送的目的地,也就是前面说的Queue和Topic |

1 | StreamMessage:Java 数据流消息,用标准流操作来顺序的填充和读取。 |
2.4常见的类JMS消息服务器
2.4.1.JMS消息服务器ActiveMQ
1 | ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线。 |
2.4.2.分布式消息中间件 Metamorphosis
1 | Metamorphosis(MetaQ)是一个高性能.高可用.可拓展的分布式消息中间件, |
2.4.3.分布式消息中间件 RocketMQ
1 | RockeMQ是一款分布式.队列模型的消息中间件,具有以下特点: |
2.4.4.其他MQ
1 | .NET消息中间件DotNetMQ |
3.为什么需要消息队列(重要)
1 | 消息系统的核心作用就是三点:解耦,异步和并行 |
3.1.用户注册的一般流程

1 | 问题:随着后端流程越来越多,每步流程都需要额外的耗费很多时间,从而会导致用户更长的等待延迟 |
3.2.用户注册的并行执行

1 | 问题:系统并行的发起了4个请求,4个请求中,如果某一个环节执行1分钟,其他环节再快,用户也需要等待1分钟。 |

3.3.用户注册的最终一致

1 | 1.保证主流程的正常执行.执行成功之后,发送MQ消息出去 |

4.Kafka核心组件
1 | Topic:消息根据Topic进行分类 |

5.Kafka集群搭建
5.1.下载解压
1 | 从官网下载Kafka,下载地址http://kafka.apache.org/downloads.html |

1 | 通过命令解压 |
5.2.重命名
1 | [root@node02 modules]# mv kafka_2.10-0.9.0.1 kafka |
5.3.配置启动
5.4.1.配置config/server.properties
1 | [root@node02 ~]# cd /opt/modules/kafka/ |
5.4.2.配置broker.id从0开始,后面其他节点配置1,2,3,4等等
1 | broker.id=0 |

5.4.3.事先启动zookeeper集群,这里配置zookeeper集群的地址
1 | zookeeper.connect=node02:2181,node03:2181,node04:2181 |

5.4.4.将集群分发到其他机器,比如
1 | [root@node02 ~]# scp -r /opt/modules/kafka/ root@node03:/opt/modules/kakfa |
5.4.5.这里注意修改broker.id.host.name
1 | [root@node03 ~]# cd /opt/modules/kafka/ |

1 | [root@node04 ~]# cd /opt/modules/kafka |

5.4.6.修改完配置以后在每个节点启动kafka的broker
1 | [root@node02 kafka]# ./bin/kafka-server-start.sh config/server.properties |

5.4.7.测试
5.4.7.1.新建一个topic
1 | [root@node02 kafka]# ./bin/kafka-topics.sh -zookeeper node02:2181,node03:2181,node04:2181 -topic test -replication-factor 2 -partitions 5 -create |

5.4.7.2.查看当前的topic
1 | [root@node02 kafka]# ./bin/kafka-topics.sh -zookeeper node02:2181,node03:2181,node04:2181 -list |

5.4.7.3.查看一个topic的分区及副本状态信息
1 | [root@node03 kafka]# ./bin/kafka-topics.sh --describe --zookeeper node02:2181,node03:2181,node04:2181 --topic test |

5.4.7.4.在一个节点创建一个provider
1 | 在provider输入信息,在consumer能接收到信息 |
1 | [root@node03 kafka]# ./bin/kafka-console-producer.sh --broker-list node02:9092,node03:9092,node04:9092 --topic test |

5.4.7.5.在另一个节点创建一个consumer
1 | [root@node04 kafka]# ./bin/kafka-console-consumer.sh --zookeeper node02:2181,node03:2181,node04:2181 --from-beginning -topic test |

5.5.Kafka生产者Java API

1 | package com.matrix.kafka; |

5.5.1.用一个comsumer从某一个topic中读取信息
1 | [root@node04 kafka]# ./bin/kafka-console-consumer.sh --zookeeper node02:2181,node03:2181,node04:2181 --from-beginning -topic test |

5.6.Kafka消费者Java API
1 | package com.matrix.kafka; |

5.7.关闭Kafka服务
1 | [root@node02 kafka]# ./bin/kafka-server-stop.sh |
本文作者 : Matrix
原文链接 : https://matrixsparse.github.io/2016/02/12/Kafka开源消息系统分布式集群搭建/
版权声明 : 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!
知识 & 情怀 | 二者兼得