更多博客请见 我的语雀知识库
Kafka支持幂等操作,保证了消息不重复,不丢失。从Kafka3.0版本开始默认开启幂等。
开启幂等需要配置消息发送的重试次数

// 开启生产者幂等
props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); 
props.setProperty(ProducerConfig.ACKS_CONFIG, "all"); 
// 配置重试次数
props.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE)); 
// 如果 Kafka 版本 >= 2.0,可配置为5;否则配置成 1
props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); 

为什么要设置发送消息的重试次数

当生产者向 Broker 端发送消息失败时,Broker 端会向生产者端返回错误码。因此我们需要重新发送该消息。这是尽量避免消息丢失。但是会有这样一种情况: 第一次发送消息时,虽然 Broker 已经收到消息了,但由于网络抖动等原因导致 Broker 向生产者反馈的 ack 失败。因此,生产者会重新发送这批消息,最终导致消息重复。 但还是要开启重试机制。重复的问题留给幂等性解决。

幂等,如何做到?

做到不重复

要做到幂等性,要解决下面的问题:

  1. Broker 端能够鉴别一条数据到底是不是重复的数据?最常用的手段是通过 唯一键/唯一id 来判断,因此需要在 Broker 端缓存已经处理的唯一键记录,这样才能判断出新发送过来的数据是否重复。
  2. 唯一键应该选择什么粒度?对于分布式系统来说,肯定不建议使用全局唯一键。Kafka 在这方面的考虑是在分区的维度上面设计唯一键,让每个分区的 Leader 判断数据是否重复。
  3. 将唯一键设计到分区粒度是否有问题?有可能有多个生产者同时向一个 Partition 中写入数据,各个生产者都是独立的个体,他们之间的通信比较困难。因此,选择设计的唯一键是 Producer + TopicPartition 维度。
  4. 如何保证消息的顺序性?生产者在发送信息时会同时发送一个序列号,序列号以 TopicPartition 为基础,从 0 开始,并随着每条信息的产生而递增。在 Broker 端通过序列号是否连续来判断消息是否是顺序发送。

出于对以上内存的考虑,Kafka 要做到幂等性,就要做到以下几个方面:

Broker角度

  1. PID(Producer ID):给每个生产者都指定的生产者 ID(PID)。
  2. Sequence Number:生产者在发送信息时会发送一个序列号,序列号以 TopicPartition 为基础,从 0 开始,并随着每条信息的产生而递增。Broker 会存储每个生产者为给定的 TopicPartition 发送的序列号,并据此判断该消息是否是:
    • 新消息:Broker 新收到的消息的序列号刚好比给定生产者在本地存储的序列号大 1。
    • 重复消息:Broker 新收到的消息的序列号小于等于给定生产者保存的最大序列号。
    • 失序:Broker 新收到的消息序列号 - 本地最大序列号的差值大于 1,这意味着有些消息丢失了。

Producer角度

上面是从服务器角度考虑的幂等性实现。但如果我们能在消息发出去之前就判断出这条消息是重复的岂不更好? Kafka的消息是批发送的,批消息叫做ProducerBatch,在一个ProducerBatch中的消息都是要发往同一个Partition的。如果一条消息重复发送那么其所属的ProducerBatch一定也重复发送了。 每当有新的 ProducerBatch 被添加到 RecordAccumulator 时,就会有新的元数据 batchMetadata 被添加到batchMetadata队列中(该队列最大为5),一个batchMetadata 对应一个ProducerBatch。而 batchMetadata 中主要包括lastSeq、lastOffset、offsetDelta 和 timestamp 等元数据。

  1. lastSeq:序列号是每条添加到 ProducerBatch 的编号,而 lastSeq 记录的是每个 ProducerBatch 的最后一条消息的序列号。
  2. lastOffset:每个 ProducerBatch 中的最后一条消息的 offset。
  3. offsetDelta :每个 ProducerBatch 中,最后一条消息和第一条消息的 offset 差值。lastSeq - offsetDelta 可以得到第一条消息的 seq,lastOffset - offsetDelta 可以得到第一条消息的 offset。
  4. timestamp:每个 ProducerBatch 最后一条消息的添加时间。

在Producer内部有专门的方法校验ProducerBatch是否有重复消息:

  • 遍历 batchMetadata队列中的元数据,如果新发送的 ProducerBatch 的 firstSeq 和 lastSeq 都和 batchMetadata队列中缓存的某个 batchMetadata 相同,说明是重复批次。

简单说就是: Producer维护了发往某个Partition的最近五个批次的元数据。里面保存了每一个批次的最新和最旧序列号。如果现在准备发送的批消息的最新最旧序列号和最近五个批消息的最新最近系列号重合了,说明是重复消息。不予发送。 为什么是最近五个?因为源代码里写死了。

做到不乱序

存储ProducerBatch的是一个队列。假如现在情况如下:

                  】 <— 1 2 3 4 5 6 7 8 9

顺序发送数据包123,456,789。但是由于网络链路的原因,123先到,789次之,造成了乱序

1 2 3 7 8 9       】 4 5 6 <— Empty Queue

如果不开启幂等性,Broker不管任何顺序都是接收不误的。一旦开启了幂等性,Broker就会拒接接收789数据包。将789退还给双端队列。

1 2 3             】 4 5 6 <— 7 8 9

接收456,再接收789

1 2 3 4 5 6       】 <— 7 8 9

这样就解决了乱序问题了。

总结

这样实现的幂等还是有缺陷的:

  1. 只能保证 Producer 在单个会话内消息的不丟不重不乱,如果 Producer 出现意外宕机再重启是无法保证的(由于在幂等性的场景下,无法获取 Producer 宕机之前的状态信息比如PID,PID每次都是不一样的。因此是无法做到跨会话级别的不丢不重不乱);
  2. 幂等性不能跨多个 TopicPartition,只能保证 Producer 在单个 TopicPartition 内的幂等性,当涉及多个 TopicPartition 时,这些状态信息并没有同步。

如果需要实现跨会话、跨多个 Topic-Partition 的幂等性,需要使用 Kafka 的事务性来实现。