这里使用 SpringBoot 2.4.2
+ kafka 2.7.0
开发配置测试 依赖 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 <dependency > <groupId > org.springframework.kafka</groupId > <artifactId > spring-kafka</artifactId > </dependency > <dependency > <groupId > org.apache.kafka</groupId > <artifactId > kafka-clients</artifactId > <version > 2.6.1</version > <scope > compile</scope > </dependency > <dependency > <groupId > org.apache.kafka</groupId > <artifactId > kafka-streams</artifactId > <version > 2.6.1</version > <scope > compile</scope > <optional > true</optional > </dependency >
配置 Kafka在属性文件格式中使用键值对进行配置。这些值可以通过文件或编程方式提供。 必备配置如下:
broker.id=0
log.dirs=/tmp/kafka-logs
zookeeper.connect=hostname1:port1,hostname2:port2,hostname3:port3
auto.create.topics.enable=true 是否允许在服务器上自动创建topic
application.yml
配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 spring kafka: bootstrap-servers: 127.0 .0 .1 :9092 producer: retries: 0 acks: 1 batch-size: 10240 properties.linger.ms: 0 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: properties: group: id: defaultConsumerGroup enable-auto-commit: true auto: commit: interval: ms: 1000 auto-offset-reset: latest properties.session.timeout.ms: 120000 properties.request.timeout.ms: 180000 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer listener: missing-topics-fatal: false
创建 topic 的类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Configuration public class DefaultTestKafkaInitialConfiguration { public static final String TEST_TOPIC = "test-topic" ; @Bean public NewTopic initialTopic () { return new NewTopic (TEST_TOPIC, 1 , (short ) 1 ); } }
简单的生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public interface IKafkaService { void sendMessage (String topic, String message) ; } @Slf4j @Service public class KafkaServiceImpl implements IKafkaService { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; @Override public void sendMessage (String topic, String message) { kafkaTemplate.send(topic, message); } }
简单的消费者 1 2 3 4 5 6 7 8 9 10 11 12 @Slf4j @Component public class KafkaConsumer { @KafkaListener(topics = {DefaultTestKafkaInitialConfiguration.TEST_TOPIC}) public void onMessage1 (ConsumerRecord<?, ?> record) { log.debug("消费:{}-{}-{}" , record.topic(), record.partition(), record.value()); } }
简单测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Slf4j @RunWith(SpringRunner.class) @SpringBootTest(classes = start.DemoApplication.class) public class KafkaTest { @Autowired private IKafkaService kafkaService; @Test public void sendMessage () { String message = "这是一个简单的消息!" ; log.debug("发送消息:{}-{}" , DefaultTestKafkaInitialConfiguration.TEST_TOPIC, message); kafkaService.sendMessage(DefaultTestKafkaInitialConfiguration.TEST_TOPIC, message); log.debug("发送完成" ); } }
输出结果
1 2 3 2021-01-26 15:40:33 DEBUG (KafkaTest.java:31)- 发送消息:test-topic-这是一个简单的消息! 2021-01-26 15:49:32 DEBUG (KafkaTest.java:33)- 发送完成 2021-01-26 15:49:48 DEBUG (KafkaConsumer.java:24)- 消费:test-topic-0-这是一个简单的消息!
带回调的生产者 kafkaTemplate
提供了一个回调方法addCallback
,可以在回调方法中监控消息是否发送成功,或失败时做补偿处理 有集中写法,这里简单的介绍两种 lambda
和接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @Slf4j @Service("kafkaService") public class KafkaServiceImpl implements IKafkaService { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; @Override public void sendMessage (String topic, String message) { kafkaTemplate.send(topic, message).addCallback(success -> { if (success == null ) { log.debug("消息发送成功,响应数据不存在" ); } else { RecordMetadata recordMetadata = success.getRecordMetadata(); log.debug("消息发送成功:topic:{} partition:{} offset:{}" , recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()); } }, throwable -> { log.warn("消息发送失败:topic:{} message{} error:{}" , topic, message, throwable.getMessage()); }); kafkaTemplate.send(topic, message).addCallback(new ListenableFutureCallback <SendResult<String, Object>>() { @Override public void onSuccess (SendResult<String, Object> success) { if (success == null ) { log.debug("消息发送成功,响应数据不存在" ); } else { RecordMetadata recordMetadata = success.getRecordMetadata(); log.debug("消息发送成功:topic:{} partition:{} offset:{}" , recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()); } } @Override public void onFailure (Throwable throwable) { log.warn("消息发送失败:topic:{} message{} error:{}" , topic, message, throwable.getMessage()); } }); } }
自定义分区器 我们知道,kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区策略,Kafka 为我们提供了默认的分区策略,同时它也支持自定义分区策略。其路由机制为:
若发送消息时指定了分区(即自定义分区策略),则直接将消息append到指定分区;
若发送消息时未指定 partition
,但指定了 key(kafka允许为每条消息设置一个key),则对key值进行hash计算,根据计算结果路由到指定分区,这种情况下可以保证同一个 Key 的所有消息都进入到相同的分区;
partition
和 key 都未指定,则使用kafka默认的分区策略,轮询选出一个 partition
;
※ 我们来自定义一个分区策略,将消息发送到我们指定的 partition
,首先新建一个分区器类实现 Partitioner
接口,重写方法,其中partition方法的返回值就表示将消息发送到几号分区,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class CustomPartitioner implements Partitioner { @Override public int partition (String topic, Object key, byte [] keyBytes, Object value, byte [] valueBytes, Cluster cluster) { return 0 ; } @Override public void close () { } @Override public void configure (Map<String, ?> configs) { } }
在application.yml
中配置自定义分区器,配置的值就是分区器类的全路径名,
1 2 # 自定义分区器 spring.kafka.producer.properties.partitioner.class=gt.maxzhao.mq.config.CustomPartitioner
kafka
事务提交如果在发送消息时需要创建事务,可以使用 KafkaTemplate 的 executeInTransaction 方法来声明事务,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Slf4j @Service("kafkaService") public class KafkaServiceImpl implements IKafkaService { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; @Override public void sendMessageWithTransaction (String topic, String message) { kafkaTemplate.executeInTransaction(operations -> { operations.send(topic, message).addCallback(success -> { }, throwable -> { }); throw new RuntimeException (); }); } }
消费者测试 订阅 订阅 topic
是以组的形式进行的
同一组的同一个partition
只能被消费一次。
不同组可以共同消费同一个 partition
。
同一组的监听数量大于 partition
,那么一定有监听空闲。
简单消费 1、指定topic、partition、offset消费
前面监听消费 topic 的时候,监听的是 topic 上所有的消息,如果想指定指定partition、指定offset来消费,直接配置@KafkaListener
注解。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Slf4j @Component public class KafkaConsumer { @KafkaListener(topics = {DefaultTestKafkaInitialConfiguration.TEST_TOPIC}) public void onMessage1 (ConsumerRecord<?, ?> record) { log.debug("消费:{}-{}-{}" , record.topic(), record.partition(), record.value()); } @KafkaListener(id = "consumer1", groupId = "consumer-group", topicPartitions = { @TopicPartition(topic = DefaultTestKafkaInitialConfiguration.TEST_TOPIC, partitions = {"0"}), @TopicPartition(topic = DefaultTestKafkaInitialConfiguration.TEST_TOPIC_2, partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8")) }) public void onMessage2 (ConsumerRecord<?, ?> record) { log.debug("消费2:topic:{} partition:{} value:{} offset:{}" , record.topic(), record.partition(), record.value(), record.offset()); } }
onMessage2
监听的含义:监听 TEST_TOPIC
的0号分区,同时监听 TEST_TOPIC_2
的0号分区和 TEST_TOPIC_2
的1号分区里面 offset
从8开始的消息。
属性解释:
id
:消费者ID;
groupId
:消费组ID;
topics
:监听的 topic
,可监听多个;
topicPartitions
:可配置更加详细的监听信息,可指定 topic
、partition
、offset
监听。
注意:topics
和 topicPartitions
不能同时使用;
最终结果会发现,onMessage1
、onMessage2
都可以消费 TEST_TOPIC
的消息。
批量消费 设置 application.yml
开启批量消费即可,
1 2 3 4 5 6 spring.kafka.listener.type: batch spring.kafka.consumer.max-poll-records: 50 spring.kafka.producer.properties.linger.ms: 50
接收消息时用List来接收,监听代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 @Slf4j @Component public class KafkaConsumer { @KafkaListener(id = "consumer2", groupId = "consumer-group", topics = {DefaultTestKafkaInitialConfiguration.TEST_TOPIC_3}) public void onMessage3 (List<ConsumerRecord<?, ?>> records) { for (ConsumerRecord<?, ?> record : records) { log.debug("批量消费:topic:{} partition:{} value:{} offset:{}" , record.topic(), record.partition(), record.value(), record.offset()); } } } @Slf4j @RunWith(SpringRunner.class) @SpringBootTest(classes = start.DemoApplication.class) public class KafkaTest { @Autowired private IKafkaService kafkaService; @Test public void sendMessage3 () { String message = "这是一个简单的消息!" ; String message2 = "这是一个简单的消息!_2" ; String message3 = "这是一个简单的消息!_3" ; String message4 = "这是一个简单的消息!_4" ; log.debug("发送消息:{}-{}" , DefaultTestKafkaInitialConfiguration.TEST_TOPIC_3, message); kafkaService.sendMessage(DefaultTestKafkaInitialConfiguration.TEST_TOPIC_3, message); kafkaService.sendMessage(DefaultTestKafkaInitialConfiguration.TEST_TOPIC_3, message2); try { Thread.sleep(2000 ); } catch (InterruptedException e) { } kafkaService.sendMessage(DefaultTestKafkaInitialConfiguration.TEST_TOPIC_3, message3); kafkaService.sendMessage(DefaultTestKafkaInitialConfiguration.TEST_TOPIC_3, message4); try { Thread.sleep(5000 ); } catch (InterruptedException e) { } log.debug("发送完成" ); } }
输出结果为
1 2 3 4 2021-01-26 20:23:20 DEBUG (KafkaConsumer.java:52)- 批量消费:topic:test-topic_3 partition:0 value:这是一个简单的消息! offset:10 2021-01-26 20:23:20 DEBUG (KafkaConsumer.java:52)- 批量消费:topic:test-topic_3 partition:0 value:这是一个简单的消息!_2 offset:11 2021-01-26 20:23:22 DEBUG (KafkaConsumer.java:52)- 批量消费:topic:test-topic_3 partition:0 value:这是一个简单的消息!_3 offset:12 2021-01-26 20:23:22 DEBUG (KafkaConsumer.java:52)- 批量消费:topic:test-topic_3 partition:0 value:这是一个简单的消息!_4 offset:13
ConsumerAwareListenerErrorHandler
异常处理器通过异常处理器,我们可以处理consumer在消费时发生的异常。 新建一个 ConsumerAwareListenerErrorHandler
类型的异常处理方法,用 @Bean
注入, 然后我们将这个异常处理器的 Bean
放到 @KafkaListener
注解的 errorHandler
属性里面,当监听抛出异常的时候,则会自动调用异常处理器,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Slf4j @Component public class KafkaConsumer { @Bean public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler () { return (message, exception, consumer) -> { log.debug("批量消费异常:topic:{} message:{}" , consumer.listTopics(), message.getPayload()); return null ; }; } @KafkaListener(id = "consumer4", groupId = "consumer-group", topics = {DefaultTestKafkaInitialConfiguration.TEST_TOPIC_4}, errorHandler = "consumerAwareErrorHandler") public void onMessage4 (List<ConsumerRecord<?, ?>> records) throws Exception { throw new Exception ("批量消费-模拟异常" ); } } @Slf4j @RunWith(SpringRunner.class) @SpringBootTest(classes = start.DemoApplication.class) public class KafkaTest { @Test public void sendMessage4 () { String message = "这是一个简单的消息_4!" ; log.debug("发送消息:{}-{}" , DefaultTestKafkaInitialConfiguration.TEST_TOPIC_4, message); kafkaService.sendMessage(DefaultTestKafkaInitialConfiguration.TEST_TOPIC_4, message); log.debug("发送完成" ); } }
执行看一下效果,
1 2021-01-26 20:36:34 DEBUG (KafkaConsumer.java:61)- 批量消费异常:topic:{test-topic_2=[Partition(topic = test-topic_2, partition = 0, leader = 0, replicas = [0], isr = [0], offlineReplicas = [])], test=[Partition(topic =******************************serialized key size = -1, serialized value size = 32, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 这是一个简单的消息_4!)]
消息过滤器 消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由 KafkaListener
处理,不需要的消息则过滤掉。 配置消息过滤只需要为 监听器工厂 配置一个 RecordFilterStrategy
(消息过滤策略),返回 true
的时候消息将会被抛弃,返回 false
时,消息能正常抵达监听容器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 @Slf4j @Component public class KafkaConsumer { @Autowired private ConsumerFactory consumerFactory; @Bean public ConcurrentKafkaListenerContainerFactory filterContainerFactory () { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory (); factory.setConsumerFactory(consumerFactory); factory.setAckDiscarded(true ); factory.setRecordFilterStrategy(consumerRecord -> { if (consumerRecord.value().toString().length() > 20 ) { return false ; } return true ; }); return factory; } @KafkaListener(id = "consumer5", groupId = "consumer-group", topics = {DefaultTestKafkaInitialConfiguration.TEST_TOPIC_5}, containerFactory = "filterContainerFactory") public void onMessage5 (String record) { log.debug("批量消费-拦截长度小于20的字符串: value:{} " , record); } } @Slf4j @RunWith(SpringRunner.class) @SpringBootTest(classes = start.DemoApplication.class) public class KafkaTest { @Test public void sendMessage5 () { String message = "这是一个简单的消息_5!" ; String message2 = "这是一个简单的消息_5!__________________________________________________" ; kafkaService.sendMessage(DefaultTestKafkaInitialConfiguration.TEST_TOPIC_5, message); kafkaService.sendMessage(DefaultTestKafkaInitialConfiguration.TEST_TOPIC_5, message2); log.debug("发送完成" ); } }
输出结果
1 2 3 4 2021-01-26 21:03:10 DEBUG (KafkaTest.java:96)- 发送完成 2021-01-26 21:03:10 DEBUG (KafkaServiceImpl.java:33)- 消息发送成功:topic:test-topic_5 partition:0 offset:26 2021-01-26 21:03:10 DEBUG (KafkaConsumer.java:112)- 批量消费-拦截长度小于20的字符串: value:这是一个简单的消息_5!__________________________________________________ 2021-01-26 21:03:10 DEBUG (KafkaServiceImpl.java:33)- 消息发送成功:topic:test-topic_5 partition:0 offset:27
消息转发 消息转发在实际开发中,应用A从TopicA
获取到消息,经过处理后转发到TopicB
, 再由应用B监听处理消息,即一个应用处理完成后将该消息转发至其他应用,完成消息的转发。 在SpringBoot
集成Kafka
实现消息的转发,只需要通过一个@SendTo
注解,被注解方法的return
值即转发的消息内容,如下,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Slf4j @Component public class KafkaConsumer { @KafkaListener(id = "consumer6", groupId = "consumer-group", topics = {DefaultTestKafkaInitialConfiguration.TEST_TOPIC_6}) @SendTo(DefaultTestKafkaInitialConfiguration.TEST_TOPIC_5) public String onMessage6 (List<ConsumerRecord<?, ?>> records) { log.debug("消息转发 6:topic:{} partition:{} value:{} offset:{}" , records.get(0 ).topic(), records.get(0 ).partition(), records.get(0 ).value(), records.get(0 ).offset()); return "消息转发:" + records.get(0 ).value().toString(); } }
输出结果,如果结果不同,可以查看自己设置的延时时间及配置
1 2 3 2021-01-30 14:26:29 DEBUG (KafkaConsumer.java:122)- 消息转发 6:topic:test-topic_6 partition:0 value:这是一个简单的消息_6! offset:10 2021-01-30 14:26:29 DEBUG (KafkaConsumer.java:122)- 消息转发 6:topic:test-topic_6 partition:0 value:这是一个简单的消息_6!———————————————————————————————————————————————— offset:11 2021-01-30 14:26:29 DEBUG (KafkaConsumer.java:112)- 批量消费-拦截长度小于20的字符串: value:消息转发:这是一个简单的消息_6!————————————————————————————————————————————————
消费监听的起停 默认情况下,当消费者项目启动的时候,监听器就开始工作,监听消费发送到指定topic
的消息, 那如果我们不想让监听器立即工作,想让它在我们指定的时间点开始工作, 或者在我们指定的时间点停止工作,使用 KafkaListenerEndpointRegistry
实现:
禁止监听器自启动;
延时开启,发送测试;
延时暂停,发送测试;
延时停止,发送测试;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 @Slf4j @Component public class KafkaConsumer { @Bean public ConcurrentKafkaListenerContainerFactory delayContainerFactory () { ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory (); container.setConsumerFactory(consumerFactory); container.setAutoStartup(false ); return container; } @KafkaListener(id = "consumer7", groupId = "consumer-group", topics = {DefaultTestKafkaInitialConfiguration.TEST_TOPIC_7}, containerFactory = "delayContainerFactory") public void onMessage7 (String record) { log.debug("消息监听的起停控制-启动后可以接收到的消息: value:{} " , record); } } @Slf4j @RunWith(SpringRunner.class) @SpringBootTest(classes = start.DemoApplication.class) public class KafkaTest { @Test public void sendMessage7 () { kafkaService.sendMessage(DefaultTestKafkaInitialConfiguration.TEST_TOPIC_7, "监听 7 默认未未启动,这是一个简单的消息_7!" ); try { Thread.sleep(1000 ); } catch (InterruptedException e) { } if (!registry.getListenerContainer("consumer7" ).isRunning()) { log.debug("监听 7 还未启动,准备启动监听7" ); registry.getListenerContainer("consumer7" ).start(); } log.debug("监听 7 启动" ); try { Thread.sleep(1000 ); } catch (InterruptedException e) { } kafkaService.sendMessage(DefaultTestKafkaInitialConfiguration.TEST_TOPIC_7, "监听 7 已启动——————————!" ); try { Thread.sleep(1000 ); } catch (InterruptedException e) { } registry.getListenerContainer("consumer7" ).pause(); log.debug("监听 7 暂停" ); try { Thread.sleep(5000 ); } catch (InterruptedException e) { } kafkaService.sendMessage(DefaultTestKafkaInitialConfiguration.TEST_TOPIC_7, "监听 7 已暂停——————————————!" ); try { Thread.sleep(1000 ); } catch (InterruptedException e) { } registry.getListenerContainer("consumer7" ).stop(); log.debug("监听 7 停止" ); try { Thread.sleep(1000 ); } catch (InterruptedException e) { } kafkaService.sendMessage(DefaultTestKafkaInitialConfiguration.TEST_TOPIC_7, "监听 7 已停止——————————————!" ); log.debug("发送完成" ); try { Thread.sleep(5000 ); } catch (InterruptedException e) { } } }
运行结果,
暂停监听后,消息不会被接收(如果 pause
时,延迟1秒,可能消息会被接收到,与设想的不一样,可以调用 isConsumerPaused
查看消息是否被暂停)
监听停止后,就不在接收消息。
虽然监听停止了,但因为 kafka
服务还在,所以消息是发送成功的。
监听停止,经过简单测试发现是立即执行的,监听暂停不是立即执行的。
pause()
方法在下一次poll()
之前生效,而resume()
方法在当前的poll()
之后生效。当一个容器被暂停,它会继续拉取消费者,避免再均衡(如果组管理有使用),但是不会索取任何记录。
pause
后延迟5秒的结果:
1 2 3 4 5 6 7 8 9 10 11 2021-02-01 11:44:22 DEBUG (KafkaServiceImpl.java:30)- 消息发送成功:message:监听 7 默认未未启动,这是一个简单的消息_7! topic:test-topic_7 partition:0 offset:11 2021-02-01 11:44:23 DEBUG (KafkaTest.java:127)- 监听 7 还未启动,准备启动监听7 2021-02-01 11:44:23 DEBUG (KafkaTest.java:130)- 监听 7 启动 2021-02-01 11:44:24 DEBUG (KafkaServiceImpl.java:30)- 消息发送成功:message:监听 7 已启动——————————! topic:test-topic_7 partition:0 offset:12 2021-02-01 11:44:24 DEBUG (KafkaConsumer.java:151)- 消息监听的起停控制-启动后可以接收到的消息: value:监听 7 已启动——————————! 2021-02-01 11:44:25 DEBUG (KafkaTest.java:135)- 监听 7 暂停 2021-02-01 11:44:30 DEBUG (KafkaServiceImpl.java:30)- 消息发送成功:message:监听 7 已暂停——————————————! topic:test-topic_7 partition:0 offset:13 2021-02-01 11:44:31 DEBUG (KafkaTest.java:140)- 监听 7 停止 2021-02-01 11:44:32 DEBUG (KafkaTest.java:143)- 发送完成 2021-02-01 11:44:32 DEBUG (KafkaServiceImpl.java:30)- 消息发送成功:message:监听 7 已停止——————————————! topic:test-topic_7 partition:0 offset:14
pause
后延迟1秒的结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 2021-02-01 11:39:59 DEBUG (KafkaServiceImpl.java:30)- 消息发送成功:message:监听 7 默认未未启动,这是一个简单的消息_7! topic:test-topic_7 partition:0 offset:7 2021-02-01 11:40:00 DEBUG (KafkaTest.java:127)- 监听 7 还未启动,准备启动监听7 2021-02-01 11:40:00 DEBUG (KafkaTest.java:130)- 监听 7 启动 2021-02-01 11:40:01 DEBUG (KafkaServiceImpl.java:30)- 消息发送成功:message:监听 7 已启动——————————! topic:test-topic_7 partition:0 offset:8 2021-02-01 11:40:02 DEBUG (KafkaConsumer.java:151)- 消息监听的起停控制-启动后可以接收到的消息: value:监听 7 已启动——————————! 2021-02-01 11:40:02 DEBUG (KafkaTest.java:135)- 监听 7 暂停 2021-02-01 11:40:03 DEBUG (KafkaServiceImpl.java:30)- 消息发送成功:message:监听 7 已暂停——————————————! topic:test-topic_7 partition:0 offset:9 2021-02-01 11:40:03 DEBUG (KafkaConsumer.java:151)- 消息监听的起停控制-启动后可以接收到的消息: value:监听 7 已暂停——————————————! 2021-02-01 11:40:05 DEBUG (KafkaTest.java:140)- 监听 7 停止 2021-02-01 11:40:06 DEBUG (KafkaTest.java:144)- 发送完成 2021-02-01 11:40:06 DEBUG (KafkaServiceImpl.java:30)- 消息发送成功:message:监听 7 已停止——————————————! topic:test-topic_7 partition:0 offset:10
分组测试消费 测试同一组、不同组消费同一个 topic
的partition
查看 topic
:bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test-topic_8
1 2 Topic: test-topic_8 PartitionCount: 1 ReplicationFactor: 1 Configs: Topic: test-topic_8 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
当前 topic
只有一个 partition
。
测试前把当前 kafka
配置改为立即发送,并且设置不多发。
1 2 3 4 spring: kafka: producer.properties.linger.ms: 0
测试代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Slf4j @Component public class KafkaConsumer { @KafkaListener(id = "consumer8", groupId = "consumer-group8", topics = {DefaultTestKafkaInitialConfiguration.TEST_TOPIC_8}) public void onMessage8 (String record) { log.debug("分组测试消费:group8-consumer8 收到消息: value:{} " , record); } @KafkaListener(id = "consumer8_1", groupId = "consumer-group8", topics = {DefaultTestKafkaInitialConfiguration.TEST_TOPIC_8}) public void onMessage8_1 (String record) { log.debug("分组测试消费:group8-consumer8_1 收到消息: value:{} " , record); } @KafkaListener(id = "consumer8_2", groupId = "consumer-group8_2", topics = {DefaultTestKafkaInitialConfiguration.TEST_TOPIC_8}) public void onMessage8_2 (String record) { log.debug("分组测试消费:group8_2-consumer8_2 收到消息: value:{} " , record); } } @Slf4j @RunWith(SpringRunner.class) @SpringBootTest(classes = start.DemoApplication.class) public class KafkaTest { @Test public void sendMessage8 () { kafkaService.sendMessage(DefaultTestKafkaInitialConfiguration.TEST_TOPIC_8, "这是一个简单的消息_8!" ); log.debug("发送完成" ); } }
测试结果
1 2 3 4 2021-02-01 15:40:43 DEBUG (KafkaTest.java:172)- 发送完成 2021-02-01 15:40:43 DEBUG (KafkaServiceImpl.java:30)- 消息发送成功:message:这是一个简单的消息_8! topic:test-topic_8 partition:0 offset:0 2021-02-01 15:40:43 DEBUG (KafkaConsumer.java:168)- 分组测试消费:group8-consumer8_1 收到消息: value:这是一个简单的消息_8! 2021-02-01 15:40:43 DEBUG (KafkaConsumer.java:176)- 分组测试消费:group8_2-consumer8_2 收到消息: value:这是一个简单的消息_8! :0 offset:0
也就是意味着group8
中的两个监听只有一个监听到轮数据。
以下来自简书
简介 kafka是一个分布式消息队列。具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。一般在架构设计中起到解耦、削峰、异步处理的作用。
kafka对外使用topic的概念,生产者往topic里写消息,消费者从读消息。为了做到水平扩展,一个topic实际是由多个partition组成的,遇到瓶颈时,可以通过增加partition的数量来进行横向扩容。单个parition内是保证消息有序。
每新写一条消息,kafka就是在对应的文件append写,所以性能非常高。
kafka的总体数据流是这样的:
kafka data flow
大概用法就是,Producers往Brokers里面的指定Topic中写消息,Consumers从Brokers里面拉去指定Topic的消息,然后进行业务处理。 图中有两个topic,topic 0有两个partition,topic 1有一个partition,三副本备份。可以看到consumer gourp 1中的consumer 2没有分到partition处理,这是有可能出现的,下面会讲到。
关于broker、topics、partitions的一些元信息用zk来存,监控和路由啥的也都会用到zk。
生产 基本流程是这样的:
kafka sdk product flow.png
创建一条记录,记录中一个要指定对应的topic和value,key和partition可选。 先序列化,然后按照topic和partition,放进对应的发送队列中。kafka produce都是批量请求,会积攒一批,然后一起发送,不是调send()就进行立刻进行网络发包。 如果partition没填,那么情况会是这样的:
key有填 按照key进行哈希,相同key去一个partition。(如果扩展了partition的数量那么就不能保证了)
key没填 round-robin来选partition
这些要发往同一个partition的请求按照配置,攒一波,然后由一个单独的线程一次性发过去。
API 有high level api,替我们把很多事情都干了,offset,路由啥都替我们干了,用以来很简单。 还有simple api,offset啥的都是要我们自己记录。
partition 当存在多副本的情况下,会尽量把多个副本,分配到不同的broker上。kafka会为partition选出一个leader,之后所有该partition的请求,实际操作的都是leader,然后再同步到其他的follower。 当一个broker歇菜后,所有leader在该broker上的partition都会重新选举,选出一个leader。(这里不像分布式文件存储系统那样会自动进行复制保持副本数)
然后这里就涉及两个细节:怎么分配partition,怎么选leader。
关于partition的分配,还有leader的选举,总得有个执行者。在kafka中,这个执行者就叫controller。 kafka使用zk在broker中选出一个controller,用于partition分配和leader选举。
partition的分配
将所有Broker(假设共n个Broker)和待分配的Partition排序
将第i个Partition分配到第(i mod n)个Broker上 (这个就是leader)
将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上
leader容灾 controller会在Zookeeper的/brokers/ids节点上注册Watch,一旦有broker宕机,它就能知道。当broker宕机后,controller就会给受到影响的partition选出新leader。controller从zk的/brokers/topics/[topic]/partitions/[partition]/state中,读取对应partition的ISR(in-sync replica已同步的副本)列表,选一个出来做leader。 选出leader后,更新zk,然后发送LeaderAndISRRequest给受影响的broker,让它们改变知道这事。为什么这里不是使用zk通知,而是直接给broker发送rpc请求,我的理解可能是这样做zk有性能问题吧。
如果ISR列表是空,那么会根据配置,随便选一个replica做leader,或者干脆这个partition就是歇菜。如果ISR列表的有机器,但是也歇菜了,那么还可以等ISR的机器活过来。
多副本同步 这里的策略,服务端这边的处理是follower从leader批量拉取数据来同步。但是具体的可靠性,是由生产者来决定的。 生产者生产消息的时候,通过request.required.acks参数来设置数据的可靠性。
acks
what happen
0
which means that the producer never waits for an acknowledgement from the broker.发过去就完事了,不关心broker是否处理成功,可能丢数据。
1
which means that the producer gets an acknowledgement after the leader replica has received the data. 当写Leader成功后就返回,其他的replica都是通过fetcher去同步的,所以kafka是异步写,主备切换可能丢数据。
-1
which means that the producer gets an acknowledgement after all in-sync replicas have received the data. 要等到isr里所有机器同步成功,才能返回成功,延时取决于最慢的机器。强一致,不会丢数据。
在acks=-1的时候,如果ISR少于min.insync.replicas指定的数目,那么就会返回不可用。
这里ISR列表中的机器是会变化的,根据配置replica.lag.time.max.ms,多久没同步,就会从ISR列表中剔除。以前还有根据落后多少条消息就踢出ISR,在1.0版本后就去掉了,因为这个值很难取,在高峰的时候很容易出现节点不断的进出ISR列表。
从ISA中选出leader后,follower会从把自己日志中上一个高水位后面的记录去掉,然后去和leader拿新的数据。因为新的leader选出来后,follower上面的数据,可能比新leader多,所以要截取。这里高水位的意思,对于partition和leader,就是所有ISR中都有的最新一条记录。消费者最多只能读到高水位;
从leader的角度来说高水位的更新会延迟一轮,例如写入了一条新消息,ISR中的broker都fetch到了,但是ISR中的broker只有在下一轮的fetch中才能告诉leader。
也正是由于这个高水位延迟一轮,在一些情况下,kafka会出现丢数据和主备数据不一致的情况,0.11开始,使用leader epoch来代替高水位。(https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation#KIP-101-AlterReplicationProtocoltouseLeaderEpochratherthanHighWatermarkforTruncation-Scenario1:HighWatermarkTruncationfollowedbyImmediateLeaderElection )
*思考:* 当acks=-1时
是follwers都来fetch就返回成功,还是等follwers第二轮fetch?
leader已经写入本地,但是ISR中有些机器失败,那么怎么处理呢?
消费 订阅topic是以一个消费组来订阅的,一个消费组里面可以有多个消费者。同一个消费组中的两个消费者,不会同时消费一个partition。换句话来说,就是一个partition,只能被消费组里的一个消费者消费 ,但是可以同时被多个消费组消费。因此,如果消费组内的消费者如果比partition多的话,那么就会有个别消费者一直空闲。
untitled_page.png
API 订阅topic时,可以用正则表达式,如果有新topic匹配上,那能自动订阅上。
offset的保存 一个消费组消费partition,需要保存offset记录消费到哪,以前保存在zk中,由于zk的写性能不好,以前的解决方法都是consumer每隔一分钟上报一次。这里zk的性能严重影响了消费的速度,而且很容易出现重复消费。 在0.10版本后,kafka把这个offset的保存,从zk总剥离,保存在一个名叫__consumeroffsets topic的topic中。写进消息的key由groupid、topic、partition组成,value是偏移量offset。topic配置的清理策略是compact。总是保留最新的key,其余删掉。一般情况下,每个key的offset都是缓存在内存中,查询的时候不用遍历partition,如果没有缓存,第一次就会遍历partition建立缓存,然后查询返回。
确定consumer group位移信息写入__consumers_offsets的哪个partition,具体计算公式:
1 2 3 __consumers_offsets partition = Math .abs(groupId.hashCode() % groupMetadataTopicPartitionCount)
*思考:* 如果正在跑的服务,修改了offsets.topic.num.partitions,那么offset的保存是不是就乱套了?
分配partition–reblance 生产过程中broker要分配partition,消费过程这里,也要分配partition给消费者。类似broker中选了一个controller出来,消费也要从broker中选一个coordinator,用于分配partition。 下面从顶向下,分别阐述一下
怎么选coordinator。
交互流程。
reblance的流程。
选coordinator
看offset保存在那个partition
该partition leader所在的broker就是被选定的coordinator
这里我们可以看到,consumer group的coordinator,和保存consumer group offset的partition leader是同一台机器。
交互流程 把coordinator选出来之后,就是要分配了 整个流程是这样的:
consumer启动、或者coordinator宕机了,consumer会任意请求一个broker,发送ConsumerMetadataRequest请求,broker会按照上面说的方法,选出这个consumer对应coordinator的地址。 2.consumer 发送heartbeat请求给coordinator,返回IllegalGeneration的话,就说明consumer的信息是旧的了,需要重新加入进来,进行reblance。返回成功,那么consumer就从上次分配的partition中继续执行。
reblance流程
consumer给coordinator发送JoinGroupRequest请求。
这时其他consumer发heartbeat请求过来时,coordinator会告诉他们,要reblance了。
其他consumer发送JoinGroupRequest请求。
所有记录在册的consumer都发了JoinGroupRequest请求之后,coordinator就会在这里consumer中随便选一个leader。然后回JoinGroupRespone,这会告诉consumer你是follower还是leader,对于leader,还会把follower的信息带给它,让它根据这些信息去分配partition
consumer向coordinator发送SyncGroupRequest,其中leader的SyncGroupRequest会包含分配的情况。
coordinator回包,把分配的情况告诉consumer,包括leader。
当partition或者消费者的数量发生变化时,都得进行 reblance
。 列举一下会 reblance
的情况:
增加 partition
增加消费者
消费者主动关闭
消费者宕机了
coordinator
自己也宕机了
消息投递语义 kafka支持3种消息投递语义 At most once:最多一次,消息可能会丢失,但不会重复 At least once:最少一次,消息不会丢失,可能会重复 Exactly once:只且一次,消息不丢失不重复,只且消费一次(0.11中实现,仅限于下游也是kafka)
在业务中,常常都是使用At least once的模型,如果需要可重入的话,往往是业务自己实现。
At least once 先获取数据,再进行业务处理,业务处理成功后commit offset。 1、生产者生产消息异常,消息是否成功写入不确定,重做,可能写入重复的消息 2、消费者处理消息,业务处理成功后,更新offset失败,消费者重启的话,会重复消费
At most once 先获取数据,再commit offset,最后进行业务处理。 1、生产者生产消息异常,不管,生产下一个消息,消息就丢了 2、消费者处理消息,先更新offset,再做业务处理,做业务处理失败,消费者重启,消息就丢了
Exactly once 思路是这样的,首先要保证消息不丢,再去保证不重复。所以盯着At least once的原因来搞。 首先想出来的:
生产者重做导致重复写入消息—-生产保证幂等性
消费者重复消费—消灭重复消费,或者业务接口保证幂等性重复消费也没问题
由于业务接口是否幂等,不是kafka能保证的,所以kafka这里提供的exactly once是有限制的,消费者的下游也必须是kafka。 所以一下讨论的,没特殊说明,消费者的下游系统都是kafka(注:使用kafka conector,它对部分系统做了适配,实现了exactly once)。
生产者幂等性好做,没啥问题。
解决重复消费有两个方法:
下游系统保证幂等性,重复消费也不会导致多条记录。
把commit offset和业务处理绑定成一个事务。
本来exactly once实现第1点就ok了。
但是在一些使用场景下,我们的数据源可能是多个topic,处理后输出到多个topic,这时我们会希望输出时要么全部成功,要么全部失败。这就需要实现事务性。 既然要做事务,那么干脆把重复消费的问题从根源上解决,把commit offset和输出到其他topic绑定成一个事务。
生产幂等性 思路是这样的,为每个producer分配一个pid,作为该producer的唯一标识。producer会为每一个<topic,partition>维护一个单调递增的seq。类似的,broker也会为每个<pid,topic,partition>记录下最新的seq。当req_seq == broker_seq+1时,broker才会接受该消息。因为:
消息的seq比broker的seq大超过时,说明中间有数据还没写入,即乱序了。
消息的seq不比broker的seq小,那么说明该消息已被保存。
解决重复生产
事务性/原子性广播 场景是这样的:
先从多个源topic中获取数据。
做业务处理,写到下游的多个目的topic。
更新多个源topic的offset。
其中第2、3点作为一个事务,要么全成功,要么全失败。这里得益与offset实际上是用特殊的topic去保存,这两点都归一为写多个topic的事务性处理。
基本思路是这样的: 引入tid(transaction id),和pid不同,这个id是应用程序提供的,用于标识事务,和producer是谁并没关系。就是任何producer都可以使用这个tid去做事务,这样进行到一半就死掉的事务,可以由另一个producer去恢复。 同时为了记录事务的状态,类似对offset的处理,引入transaction coordinator用于记录transaction log。在集群中会有多个transaction coordinator,每个tid对应唯一一个transaction coordinator。 注:transaction log删除策略是compact,已完成的事务会标记成null,compact后不保留。
做事务时,先标记开启事务,写入数据,全部成功就在transaction log中记录为prepare commit状态,否则写入prepare abort的状态。之后再去给每个相关的partition写入一条marker(commit或者abort)消息,标记这个事务的message可以被读取或已经废弃。成功后在transaction log记录下commit/abort状态,至此事务结束。
数据流:
Kafka Transactions Data Flow.png
首先使用tid请求任意一个broker(代码中写的是负载最小的broker),找到对应的transaction coordinator。
请求transaction coordinator获取到对应的pid,和pid对应的epoch,这个epoch用于防止僵死进程复活导致消息错乱,当消息的epoch比当前维护的epoch小时,拒绝掉。tid和pid有一一对应的关系,这样对于同一个tid会返回相同的pid。
client先请求transaction coordinator记录<topic,partition>的事务状态,初始状态是BEGIN,如果是该事务中第一个到达的<topic,partition>,同时会对事务进行计时;client输出数据到相关的partition中;client再请求transaction coordinator记录offset的<topic,partition>事务状态;client发送offset commit到对应offset partition。
client发送commit请求,transaction coordinator记录prepare commit/abort,然后发送marker给相关的partition。全部成功后,记录commit/abort的状态,最后这个记录不需要等待其他replica的ack,因为prepare不丢就能保证最终的正确性了。
这里prepare的状态主要是用于事务恢复,例如给相关的partition发送控制消息,没发完就宕机了,备机起来后,producer发送请求获取pid时,会把未完成的事务接着完成。
当partition中写入commit的marker后,相关的消息就可被读取。所以kafka事务在prepare commit到commit这个时间段内,消息是逐渐可见的,而不是同一时刻可见。
详细细节可看:https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-ExactlyOnceDeliveryandTransactionalMessaging-TransactionalGuarantees
消费事务 前面都是从生产的角度看待事务。还需要从消费的角度去考虑一些问题。 消费时,partition中会存在一些消息处于未commit状态,即业务方应该看不到的消息,需要过滤这些消息不让业务看到,kafka选择在消费者进程中进行过来,而不是在broker中过滤,主要考虑的还是性能。kafka高性能的一个关键点是zero copy,如果需要在broker中过滤,那么势必需要读取消息内容到内存,就会失去zero copy的特性。
文件组织 kafka的数据,实际上是以文件的形式存储在文件系统的。topic下有partition,partition下有segment,segment是实际的一个个文件,topic和partition都是抽象概念。
在目录/${topicName}-{$partitionid}/下,存储着实际的log文件(即segment),还有对应的索引文件。
每个segment文件大小相等,文件名以这个segment中最小的offset命名,文件扩展名是.log;segment对应的索引的文件名字一样,扩展名是.index。有两个index文件,一个是offset index用于按offset去查message,一个是time index用于按照时间去查,其实这里可以优化合到一起,下面只说offset index。总体的组织是这样的:
kafka 文件组织.png
为了减少索引文件的大小,降低空间使用,方便直接加载进内存中,这里的索引使用稀疏矩阵,不会每一个message都记录下具体位置,而是每隔一定的字节数,再建立一条索引。 索引包含两部分,分别是baseOffset,还有position。
baseOffset:意思是这条索引对应segment文件中的第几条message。这样做方便使用数值压缩算法来节省空间。例如kafka使用的是varint。
position:在segment中的绝对位置。
查找offset对应的记录时,会先用二分法,找出对应的offset在哪个segment中,然后使用索引,在定位出offset在segment中的大概位置,再遍历查找message。
常用配置项 broker配置
配置项
作用
broker.id
broker的唯一标识
auto.create.topics.auto
设置成true,就是遇到没有的topic自动创建topic。
log.dirs
log的目录数,目录里面放partition,当生成新的partition时,会挑目录里partition数最少的目录放。
topic配置
配置项
作用
num.partitions
新建一个topic,会有几个partition。
log.retention.ms
对应的还有minutes,hours的单位。日志保留时间,因为删除是文件维度而不是消息维度,看的是日志文件的mtime。
log.retention.bytes
partion最大的容量,超过就清理老的。注意这个是partion维度,就是说如果你的topic有8个partition,配置1G,那么平均分配下,topic理论最大值8G。
log.segment.bytes
一个segment的大小。超过了就滚动。
log.segment.ms
一个segment的打开时间,超过了就滚动。
message.max.bytes
message最大多大
关于日志清理,默认当前正在写的日志,是怎么也不会清理掉的。 还有0.10之前的版本,时间看的是日志文件的mtime,但这个指是不准确的,有可能文件被touch一下,mtime就变了。因此在0.10版本开始,改为使用该文件最新一条消息的时间来判断。 按大小清理这里也要注意,Kafka在定时任务中尝试比较当前日志量总大小是否超过阈值至少一个日志段的大小。如果超过但是没超过一个日志段,那么就不会删除。
本文地址: https://github.com/maxzhao-it/blog/post/50825/