Kafka安装配置

kafka介绍1

kafka.apachecn介绍

一、安装

kafka 可以通过官网下载

kafka 根据Scala版本不同,又分为多个版本,我不需要使用Scala,所以就下载官方推荐版本kafka_2.13-2.7.0.tgz

解压到opt目录下

1
2
3
4
5
6
7
wget https://downloads.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgz
tar -xzf kafka_2.13-3.2.1.tgz -C ~/
cd ~/
# 为了使用方便,可以创建软链接`kafka`
ln -s kafka_2.13-3.2.1 kafka
#mv kafka_2.13-2.7.0 kafka
cd kafka

kafka的安装需要依赖Zookeeper

二、默认的Zookeeper

kafka自带的Zookeeper程序使用bin/zookeeper-server-start.sh,以及bin/zookeeper-server-stop.sh来启动和停止Zookeeper

Zookeeper的配制文件是config/zookeeper.properties,可以修改其中的参数

(1)启动Zookeeper

1
2
3
4
5
# 修改默认端口
sed -i 's?clientPort=2181?clientPort=42181?' ~/kafka/config/zookeeper.properties
cat ~/kafka/config/zookeeper.properties
~/kafka/bin/zookeeper-server-start.sh -daemon ~/kafka/config/zookeeper.properties
tail -f ~/kafka/logs/zookeeper.out
  • -daemon参数,可以在后台启动Zookeeper,输出的信息在保存在执行目录的logs/zookeeper.out文件中。

问题:对于小内存的服务器,启动时有可能会出现如下错误。

os::commit_memory(0x00000000e0000000, 536870912, 0) failed; error='Not enough space' (errno=12)

可以通过修改bin/zookeeper-server-start.sh中的参数,来减少内存的使用,将-Xmx512M -Xms512M改小。

(2)关闭Zookeeper

1
~/kafka/bin/zookeeper-server-stop.sh -daemon ~/kafka/config/zookeeper.properties

三、Kafka配置

kafka的配置文件在config/server.properties文件中,主要修改参数如下,更具体的参数说明以后再整理下。

  1. broker.idkafka broker的编号,集群里每个brokerid需不同。默认从0开始。
  2. listeners是监听地址,需要提供外网服务的话,要设置本地的IP地址
  3. log.dirs是日志目录,需要设置
  4. 设置Zookeeper集群地址,我是在同一个服务器上搭建了kafkaZookeeper,所以填的本地地址
  5. num.partitions 为新建Topic的默认Partition数量,partition数量提升,一定程度上可以提升并发性,数值应该小于等于broker的数量
  6. 因为要创建kafka集群,所以kafka的所有文件都复制两份,配置文件做相应的修改,尤其是brokeridIP地址和log.dirs。分别创建软链接kafka1kafka2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 集群里每个`broker`的`id`需不同。默认从0开始。
broker.id=0
############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
# 监听的地址和端口,当前监听本机所有
listeners=EXTERNAL://0.0.0.0:49092
# 主机地址
advertised.listeners=PLAINTEXT://192.168.14.123:49092
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
# 分区
num.partitions=1
############################# Zookeeper #############################
zookeeper.connect=localhost:42181

四、启动及停止Kafka

(1)启动kafka

1
2
~/kafka/bin/kafka-server-start.sh -daemon ~/kafka/config/server.properties
tail -f ~/kafka/logs/kafkaServer.out

-daemon 参数会将任务转入后台运行,输出日志信息将写入日志文件,查看 logs/kafkaServer.out,如果结尾输同started说明启动成功。

也可以用ps -ef|grep kafka命令,看有没有kafka的进程

kafka默认的 xmx xms都是 1G, 对于小内存的服务器,启动时有可能会出现如下错误。

os::commit_memory(0x00000000e0000000, 536870912, 0) failed; error='Not enough space' (errno=12)

可以通过修改bin/kafka-server-start.sh中的参数,来减少内存的使用,将配置改为-Xmx256M -Xms64M

(2)停止kafka

1
~/kafka/bin/kafka-server-stop.sh -daemon ~/kafka/config/server.properties

五、测试

前提:kafkaZookeeper已启动完成

1、创建topic

创建一个名为 testTopic,只有一个备份(replication-facto)和一个分区(partitions)。

1
~/kafka/bin/kafka-topics.sh --create --bootstrap-server 192.168.14.123:49092 --replication-factor 1 --partitions 1 --topic test

成功提示 Created topic test.

2、查看主题

1
~/kafka/bin/kafka-topics.sh --list --bootstrap-server 192.168.14.123:49092

成功提示

test

3、发送消息

Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。
运行producer(生产者),然后在控制台输入几条消息到服务器。

1
2
3
~/kafka/bin/kafka-console-producer.sh --broker-list 192.168.14.123:49092 --topic test
First message;
Second mssage;

4、接收消息

1
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.14.123:49092 --topic test --from-beginning

展示上面输入的两个消息,表示成功;

  • --from-beginning 是从开始的消息展示

5、查看特定主题的详细信息

1
~/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.14.123:49092 --describe  --topic test
1
2
Topic: test	PartitionCount: 1	ReplicationFactor: 1	Configs: 
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
  • “leader”是负责给定分区所有读写操作的节点。每个节点都是随机选择的部分分区的领导者。
  • “replicas”是复制分区日志的节点列表,不管这些节点是leader还是仅仅活着。
  • “isr”是一组“同步”replicas,是replicas列表的子集,它活着并被指到leader。

6、删除主题

1
2
~/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.14.123:49092 --delete  --topic test
~/kafka/bin/kafka-topics.sh --list --bootstrap-server 192.168.14.123:49092

六、设置多个broker集群

一个 broker只是集群中的一个节点,id是这个节点的名称。

1、创建多个节点的配置

1
2
cp ~/kafka/config/server.properties ~/kafka/config/server-1.properties 
cp ~/kafka/config/server.properties ~/kafka/config/server-2.properties

修改每个配置的 idid 从0开始。

简单的配置如下

server-1.properties

1
2
3
4
5
6
7
8
9
10
# 修改节点名称
broker.id=1
# 监听的地址和端口,当前监听本机所有
listeners=PLAINTEXT://0.0.0.0:49093
# 主机地址
advertised.listeners=PLAINTEXT://192.168.14.123:49093
# 修改日志目录,防止覆盖日志
log.dir=/tmp/kafka-logs-1
# 配置 Zookeeper
zookeeper.connect=192.168.14.123:4181

server-2.properties

1
2
3
4
5
6
7
8
9
10
# 修改节点名称
broker.id=2
# 监听的地址和端口,当前监听本机所有
listeners=PLAINTEXT://0.0.0.0:49094
# 主机地址
advertised.listeners=PLAINTEXT://192.168.14.123:49094
# 修改日志目录,防止覆盖日志
log.dir=/tmp/kafka-logs-2
# 配置 Zookeeper
zookeeper.connect=192.168.14.123:4181

2、启动其它配置的服务

1
2
~/kafka/bin/kafka-server-start.sh -daemon ~/kafka/config/server-1.properties
~/kafka/bin/kafka-server-start.sh -daemon ~/kafka/config/server-2.properties

3、创建新的 topic

1
2
~/kafka/bin/kafka-topics.sh --create --bootstrap-server 192.168.14.123:49092 --replication-factor 3 --partitions 1 --topic my-replicated-3-topic 
~/kafka/bin/kafka-topics.sh --list --bootstrap-server 192.168.14.123:49092

成功提示:`Created topic my-replicated-3-topic.

这时候已经完成了一个集群的配置。

启动失败解决

如果报 replication 不够的错误,则是因为 server1server2可能启动失败了。

可以把 -daemon参数删掉,然后执行命令查看错误 bin/kafka-server-start.sh config/server-1.properties;

可以直接查看日志 logs/server.out;

4、查看主题列表

1
~/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.14.123:49092 --describe  --topic my-replicated-3-topic 
1
2
Topic: my-replicated-3-topic	PartitionCount: 1	ReplicationFactor: 3	Configs: 
Topic: my-replicated-3-topic Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2

leader: 1表示当前 leader在节点1上。

5、收发消息测试

接收消息

1
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.14.123:49092 --topic my-replicated-3-topic

发送消息

1
~/kafka/bin/kafka-console-producer.sh --broker-list 192.168.14.123:49092 --topic my-replicated-3-topic

6、测试集群 leader崩溃

停止 leader1

1
2
3
4
5
6
# 查询进程
ps -ef|grep server-1.properties
# 查询 kafka java 进程更方便
jps -mv |grep server-1.properties
# 杀掉进程
kill -9 xxx

查看主题

1
~/kafka/bin/kafka-topics.sh --describe --broker-list 192.168.14.123:49092 --topic my-replicated-3-topic

此时的 leader已经变为了 0,所以 broker 0 成为了 leader, broker1已经不在备份集合里了。

1
2
Topic: my-replicated-3-topic	PartitionCount: 1	ReplicationFactor: 3	Configs: 
Topic: my-replicated-3-topic Partition: 0 Leader: 0 Replicas: 1,0,2 Isr: 0,2

发送消息测试

1
~/kafka/bin/kafka-console-producer.sh --broker-list 192.168.14.123:49094 --topic my-replicated-3-topic

查询所有消息

1
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.14.123:49092 --topic my-replicated-3-topic --from-beginning

这个时候会发现所有的消息都没有丢

删除主题

1
2
~/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.14.123:49092 --delete  --topic my-replicated-3-topic
~/kafka/bin/kafka-topics.sh --list --bootstrap-server 192.168.14.123:49092

七、安全

官方说明

Zookeeper配置

当前下载的kafka程序里自带Zookeeper,可以直接使用其自带的Zookeeper建立集群,也可以单独使用Zookeeper安装文件建立集群。

1. 单独使用Zookeeper安装文件建立集群

Zookeeper的安装及配置可以参考另一篇博客,里面有详细介绍

ZooKeeper的安装和配置过程

https://www.cnblogs.com/zhaoshizi/p/12105143.html

Kafka与Zookeeper的关系

一个典型的Kafka集群中包含若干Produce,若干broker(一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
img
1)Producer端直接连接broker.list列表,从列表中返回TopicMetadataResponse,该Metadata包含Topic下每个partition leader建立socket连接并发送消息.

2)Broker端使用zookeeper用来注册broker信息,以及监控partition leader存活性.

3)Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获取消息。

Zookeeper作用:管理broker、consumer

创建Broker后,向zookeeper注册新的broker信息,实现在服务器正常运行下的水平拓展。具体的,通过注册watcher,获取partition的信息。

Topic的注册,zookeeper会维护topic与broker的关系,通/brokers/topics/topic.name节点来记录。

Producer向zookeeper中注册watcher,了解topic的partition的消息,以动态了解运行情况,实现负载均衡。Zookeepr不管理producer,只是能够提供当前broker的相关信息。

Consumer可以使用group形式消费kafka中的数据。所有的group将以轮询的方式消费broker中的数据,具体的按照启动的顺序。Zookeeper会给每个consumer group一个ID,即同一份数据可以被不同的用户ID多次消费。因此这就是单播与多播的实现。以单个消费者还是以组别的方式去消费数据,由用户自己去定义。Zookeeper管理consumer的offset跟踪当前消费的offset。

kafka使用ZooKeeper用于管理、协调代理。每个Kafka代理通过Zookeeper协调其他Kafka代理。
当Kafka系统中新增了代理或某个代理失效时,Zookeeper服务将通知生产者和消费者。生产者与消费者据此开始与其他代理协调工作。

Zookeeper在Kakfa中扮演的角色:Kafka将元数据信息保存在Zookeeper中,但是发送给Topic本身的数据是不会发到Zk上的

· kafka使用zookeeper来实现动态的集群扩展,不需要更改客户端(producer和consumer)的配置。broker会在zookeeper注册并保持相关的元数据(topic,partition信息等)更新。
· 而客户端会在zookeeper上注册相关的watcher。一旦zookeeper发生变化,客户端能及时感知并作出相应调整。这样就保证了添加或去除 broker时,各broker间仍能自动实现负载均衡。这里的客户端指的是Kafka的消息生产端(Producer)和消息消费端(Consumer)
· Broker端使用zookeeper来注册broker信息,以及监测partitionleader存活性.
· Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partitionleader建立socket连接,并获取消息.
· Zookeer和Producer没有建立关系,只和Brokers、Consumers建立关系以实现负载均衡,即同一个ConsumerGroup中的Consumers可以实现负载均衡(因为Producer是瞬态的,可以发送后关闭,无需直接等待)

本文地址: https://github.com/maxzhao-it/blog/post/4175/