kafka分布式搭建

发布 : 2016-02-12 分类 : 大数据 浏览 :
1
2
3
4
5
6
7
(192.168.230.129)master

(192.168.230.130)slave1

(192.168.230.131)salve2

在master.slave1.slave2三台主机上配置kafaka分布式集群

准备工作:在三台机器上配置好zookeeper

1.解压kafka压缩文件到指定目录下

1
[root@master software]# tar -zxf kafka_2.10-0.8.1.1.tgz -C /opt/modules

2.修改/opt/modules/kafka_2.10-0.8.1.1/config目录下的server.properties文件

1
2
[root@master ~]# cd /opt/modules/kafka_2.10-0.8.1.1/config
[root@master config]# vi server.properties

1
2
broker.id=1
zookeeper.connect=master:2181,slave1:2181,slave2:2181

3.将master主机上kafka配置好的安装目录发送到slave1.salve2主机上

1
2
3
[root@master config]# scp -r /opt/modules/kafka_2.10-0.8.1.1/ root@slave1:/opt/modules/kafka_2.10-0.8.1.1/

[root@master config]# scp -r /opt/modules/kafka_2.10-0.8.1.1/ root@slave2:/opt/modules/kafka_2.10-0.8.1.1/

4.在slave1主机中修改/opt/modules/kafka_2.10-0.8.1.1/config/server.properties配置文件

1
2
[root@slave1 kafka_2.10-0.8.2.1]# vi config/server.properties
broker.id=2

5.在slave1主机中修改/opt/modules/kafka_2.10-0.8.1.1/config/server.properties配置文件

1
2
[root@slave2 kafka_2.10-0.8.1.1]# vi config/server.properties
broker.id=3

6.修改三个主机的/opt/modules/kafka_2.10-0.8.1.1/config/server.properties配置文件中的

1
2
3
4
5
6
[root@master kafka_2.10-0.8.1.1]# vi config/server.properties
#host.name=localhost
#注释去掉,并改成

master主机下:
host.name=master

1
2
3
4
[root@slave1 kafka_2.10-0.8.1.1]# vi config/server.properties

slave1主机下:
host.name=slave1

1
2
3
[root@slave2 kafka_2.10-0.8.1.1]# vi config/server.properties
slave2主机下:
host.name=slave2

7.在每一台节点上启动broker

1
2
在master主机上启动broker:
[root@master kafka_2.10-0.8.1.1]# ./bin/kafka-server-start.sh config/server.properties

1
2
在slave1主机上启动broker:
[root@slave1 kafka_2.10-0.8.1.1]# ./bin/kafka-server-start.sh config/server.properties

1
2
在slave2主机上启动broker:
[root@slave2 kafka_2.10-0.8.1.1]# ./bin/kafka-server-start.sh config/server.properties

8.在kafka集群中创建一个topic

1
2
在master主机中创建一个话题
[root@master kafka_2.10-0.8.1.1]# ./bin/kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 3 --partitions 3 --topic chinesescore

9.查看消息是否创建成功

1
[root@master kafka_2.10-0.8.1.1]# ./bin/kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181 --topic chinesescore

10.查看一个topic的分区及副本状态信息

1
[root@master kafka_2.10-0.8.1.1]# ./bin/kafka-topics.sh --describe --zookeeper master:2181,slave1:2181,slave2:2181 --topic chinesescore

11.用一个producer向某一个topic中写入消息

1
[root@master kafka_2.10-0.8.1.1]# ./bin/kafka-console-producer.sh --broker-list  master:9092,slave1:9092,slave2:9092 --topic chinesescore

12.用一个comsumer从某一个topic中读取信息

1
[root@slave2 kafka_2.10-0.8.1.1]# ./bin/kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --from-beginning --topic chinesescore

13.从zookeeper中查看Topic

1
[root@slave1 kafka_2.10-0.8.1.1]# zkCli.sh
1
[zk: localhost:2181(CONNECTED) 4] ls /brokers/topics

13.使用Java编写kafka生产者程序:

注意:要在路由表中配置主机名与IP地址的映射关系

1
C:\Windows\System32\drivers\etc\hosts

ProducerDemo.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.matrix.kafka;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class ProducerDemo {

public static void main(String[] args) throws Exception {

Properties props = new Properties();
props.put("zk.connect", "master:2181,slave1:2181,slave2:2181");
props.put("metadata.broker.list", "master:9092,slave1:9092,slave2:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);

// 发送业务消息
// 读取文件 读取内存数据库 读socket端口
for (int i = 1; i <= 100; i++) {
Thread.sleep(500);
producer.send(new KeyedMessage<String, String>("chinesescore",
"say hi to someone , " + i + "times again!"));
}
}
}

用一个comsumer从某一个topic中读取信息

1
[root@master kafka_2.10-0.8.1.1]# ./bin/kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --from-beginning --topic chinesescore

14.使用Java编写Kafka消费者程序

ConsumerDemo.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.matrix.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

public class ConsumerDemo {

private static final String topic = "chinesescore";
private static final Integer threads = 1;

public static void main(String[] args) {
Properties props = new Properties();
props.put("zookeeper.connect", "master:2181,slave1:2181,slave2:2181");
props.put("group.id", "1111");
props.put("auto.offset.reset", "smallest");

ConsumerConfig config = new ConsumerConfig(props);
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1);
// topicCountMap.put("mygirls", 1);
// topicCountMap.put("myboys", 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get("chinesescore");

for (final KafkaStream<byte[], byte[]> kafkaStream : streams) {
new Thread(new Runnable() {
@Override
public void run() {
for (MessageAndMetadata<byte[], byte[]> mm : kafkaStream) {
String msg = new String(mm.message());
System.out.println(msg);
}
}

}).start();

}
}
}

15.关闭Kafka服务

1
[root@master kafka_2.10-0.8.1.1]# ./bin/kafka-server-stop.sh
1
[root@slave1 kafka_2.10-0.8.1.1]# ./bin/kafka-server-stop.sh
1
[root@slave2 kafka_2.10-0.8.1.1]# ./bin/kafka-server-stop.sh
本文作者 : Matrix
原文链接 : https://matrixsparse.github.io/2016/02/12/kafka分布式搭建/
版权声明 : 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!

知识 & 情怀 | 二者兼得

微信扫一扫, 向我投食

微信扫一扫, 向我投食

支付宝扫一扫, 向我投食

支付宝扫一扫, 向我投食

留下足迹