Kafka Producer(下)

words: 3.9k    views:    time: 14min

关于Kafka Producer的实现细节与常见问题

幂等性

在一个分布式消息系统中,各个角色均有可能发生故障。以Apache Kafka为例,Broker和Client都有可能会崩溃,Broker与Client之间的网络请求与响应都有可能丢失。

根据Producer处理这类故障时采取的策略,可以分为以下几种语义:

  • 至少一次(At Least Once):当发生请求超时或者服务端错误时,Producer重复尝试发送消息直至成功。这样做可以保证每条消息都被写入Topic,但是可能会发生重复
  • 至多一次(At Most Once):在超时或报错时Producer不进行重试,每条消息仅发送一次。这样做可以避免消息重复,但也可能会导致消息丢失
  • 精确一次(Exactly Once):Producer进行适当的重试,以确保每条消息会且仅会被写入Topic 一次,既不重复,也不遗漏。Exactly Once的语义是最理想的实现,可以满足绝大多数业务场景的需求,但也是最难实现的,它需要Client与Broker之间的配合

开启幂等性

Kafka Producer开启幂等性只需要设置几个配置项,无需修改代码

相关配置

  • acks:当指定数量的副本收到消息后,Producer才会认为消息写入完成,默认为all

acks=all:Producer会等待所有同步的(in sync)副本的响应
acks=1:Producer会等待leader broker的响应
acks=0:Producer不会等待任何broker的响应,消息写入网络层后即认为写入成功

  • enable.idempotence:开启幂等性,保证每条消息写入且仅被写入一次,同时保证消息按照发送顺序写入,默认为true

开启此配置时,需要保证max.in.flight.requests.per.connection(在收到Broker响应ack之前,最多可以发送的未完成请求数量)不大于5,retries大于0,acks设置为all

Producer只能避免由自身重试策略(Producer、Broker 或网络出错)导致的消息重复,无法处理以下几种情况:
只保证会话级别的不重不漏,当Producer发生重启时,无法保证重启后与重启前发送的消息不重复
只保证Partition级别的不重不漏,不能保证向多个Partition发送的消息不重复
如果发送耗时超过了delivery.timeout.ms,Producer抛出TimeoutException,此时无法保证对应的消息是否已经被Broker持久化,需要上层根据情况进行处理

实现原理

为了实现幂等性,Kafka引入了以下两个概念:

  • Producer ID(以下简称 PID):Producer的唯一标识。PID由Idempotent Producer在首次发送消息前,请求Broker分配获得,是全局唯一的。PID仅在Producer和Broker内部使用,不会暴露给Client使用者
  • Sequence Number(以下简称 SEQ):消息的序列号。该序列号在(PID, Partition) 维度上严格递增。事实上SEQ会存储在Record Batch的头中,作为Batch中第一条消息的SEQ,Batch中其它消息的SEQ依次递增

另外还有Producer Epoch,它与PID结合才会唯一标识一个Producer,它的在不同的场景下有不同的用途:

对于开启事务能力的Producer(配置了transactional.id),Producer Epoch同样由Broker分配。这样做可以保证,多个具有相同Transactional ID的Producer中仅会有一个生效,即Fence Producer

对于没有开启事务能力的Producer,Producer Epoch则由Producer自己维护,它会在需要重置序列号(Reset SEQ)时增长,并将SEQ重置到0

对于PID与SEQ,均会跟随消息持久化到Log中

服务端

Broker会在内存中记录每个Producer的状态信息,包括Producer Epoch与每个Partition最新写入的5个Record Batch的元数据(包括SEQ、offset、timestamp等),用于判断Producer发送的请求是否存在重复或者遗漏

另外,这些状态信息也会定期进行快照,Broker在重启时会基于快照与Log中的信息恢复出这些状态信息(这里硬编码的5也是Producer配置 max.in.flight.requests.per.connection的上限)

当Broker收到一个Record Batch后,在进行完必要前置操作后、真正持久化到Log前,会检查该Batch上的PID、Producer Epoch与SEQ,具体地说:

  1. 检查该Record Batch是否与本地记录的5个Record Batch一致。若是,则认为Producer出于某些原因重复发送了该Record Batch,不进行任何操作,直接返回本地记录的元数据(主要是offset)

  2. 检查之前是否记录了该PID对应的状态信息,若没有,检查SEQ是否为0

  • 若是,则认为这是一个全新的Producer,记录该Producer相关信息,并写入Record Batch
  • 若否,则报错 UnknownProducerIdException
  1. 检查Producer Epoch是否与本地记录一致,若不一致,检查SEQ是否为0
  • 若是,则认为该Producer出于某些原因重置了SEQ,更新记录,并写入Record Batch
  • 若否,则报错OutOfOrderSequenceException
  1. 检查SEQ是否与最近一次写入的Record Batch的SEQ连续
  • 若是,则缓存该Record Batch的元数据,并写入
  • 若否,则报错OutOfOrderSequenceException

经过上述校验处理,可以确保在客户端侧,由同一个Producer向同一个Partition写入的Record Batch都是连续的(基于 SEQ),不会存在遗漏或重复

客户端

Producer对于幂等性的处理相对更加复杂,主要有以下两个难点:

  • Producer在发送时可能会发生超时,在超时时,可能存在两种可能:“Broker没有收到请求” 或 “Broker处理了请求,但Producer没有收到响应”。这就导致Producer难以确认当某个Produce请求超时时,Broker是否已经进行了持久化

  • Producer可能会向同一个Broker同时发送多个Produce请求,当其中一个或多个报错时,需要根据不同情况,对它们以及后续的请求采取不同的处理方式

基本概念

  • 在途Batch(Inflight Batch)

Producer会按Partition维度,记录已经发送请求但尚未收到响应的Batch;特别地,对于幂等Producer,还会额外记录每个Inflight Batch的SEQ,并按照SEQ排序

  • 未解决的Batch(Unresolved Batch)

Producer在发送消息时会进行数次重试,直至总耗时超出delivery.timeout.ms。如果某个Batch发生了Delivery Timeout,则认为其为Unresolved

当某个Batch被标记为Unresolved时,Producer无法判断Broker是否已经持久化这个Batch,只能通过检查这个Batch的后续Batch是否被Broker持久化(或报错OutOfOrderSequenceException):若后续Batch写入成功,则认为它之前的Unresolved Batch也已经写入完成;否则,则认为前面的Unresolved Batch没有写入完成,需要重置SEQ

  • 提升Epoch(Bump Epoch)与重置SEQ(Reset Sequence Number)

当Producer遇到无法通过重试解决的问题时(例如Inflight Batch均响应完成,但仍存在Unresolved Batch时;或Broker报错UnknownProducerIdException时),会执行Bump Epoch & Reset SEQ的操作

具体会将Producer Epoch加一,并将出错Partition的所有Inflight Batch从零开始重新编号重新发送,并清空Unresolved Batch

发送流程

  1. 判断Unresolved Batches的状态
  • 如果确认Unresolved Batch实际已写入完成,则将其从Unresolved Batches中移除
  • 如果确认Unresolved Batch实际并没有写入(判断Inflight Batches是否为空),则Bump Epoch & Reset SEQ
  1. 检查目前该Partition能否发送新的Batch,不能发送的场景有:
  • 存在Unresolved Batch
  • 之前发生了Bump Epoch,且仍存在老的Epoch的Inflight Batch
  • 之前某个Batch正在重试(也就是说,幂等Producer在重试时,Inflight始终为1)
  1. 如果之前发生了Bump Epoch,且已经不存在老Epoch的Inflight Batch,则Reset SEQ

  2. 获取对应Partition的下一个SEQ,并设置到Batch中

  3. 将Batch加入到Inflight Batches中

  4. 检查是否存在Delivery Timeout的Batch,若存在,则将其加入到Unresolved Batches中

  5. 向Broker发送Produce请求,等待响应

  6. 收到响应后,检查Error Code

  • 若为不可重试错误(例如AuthorizationException),则Bump Epoch & Reset SEQ,并向上层报错
  • 若为可重试错误(例如TimeoutException),则加入重试队列,等待下次发送。

如果报错为UnknownProducerIdException,且之前没有Reset SEQ,则Bump Epoch & Reset SEQ并重试;否则直接重试
如果报错为OutOfOrderSequenceException,且Unresolved Batch为空,或该Batch恰好为SEQ最大的Unresolved Batch的下一个,则Bump Epoch & Reset SEQ并重试;否则直接重试

  1. 从Inflight Batches中移除,并向上层返回成功

关于Inflight Request上限

Producer的配置max.in.flight.requests.per.connection存在上限5,这同时也是Broker缓存每个PID在每个Partition发送过的最新的Batch的数量。这样做的原因是,当Inflight Request数量超过Broker缓存的Batch数量时(假设 1),存在以下反例

  • Producer向Broker先后发送了两个Produce Request,且这两个请求中,均包含一个发送给Partition p1的Batch,记为b1与b2,其中b1 SEQ < b2 SEQ

  • Broker将b1 与 b2依次持久化完成(此时Broker缓存中会记录b2的元数据),但由于网络问题,Producer没有收到响应

  • Producer发现超时后重试,重新发送包含b1的Produce Request

  • Broker收到Request后发现b1 SEQ小于缓存中的b2 SEQ,可以推测出该消息为重复的,不应写入,而是直接返回offset等信息;但由于缓存中并没有b1相关元数据,Broker也就无法返回offset信息

其他细节
  • Producer Epoch 溢出处理

当Producer Epoch溢出时(short最大为32767),Producer会将PID与Epoch重置,并向Broker请求分配一个新的PID与Epoch,并Reset SEQ

  • SEQ 溢出处理

当SEQ溢出时(int最大为2147483647),下一条消息的SEQ会轮转回0。考虑到Inflight Batch的数量与Batch中消息的数量的限制,不会发生问题

  • UnknownProducerIdException 处理

通常的报错场景:由于Log Retention限制,Broker将Log中某个Producer发送的消息均删除了,此时Broker重启,缓存中不再有该Producer的状态信息。如果此时Producer尝试接着之前的SEQ发送消息,由于Broker无法识别PID,则会报错。这种情况,Producer只需Bump Epoch并 Reset SEQ,重新发送消息即可

场景示例

Broker没有收到请求

Producer没有收到响应

实现细节

消息压缩

Kafka Producer支持在客户端对消息进行压缩,以减少消息的网络传输成本与存储成本。可以通过Producer配置compression.type来指定压缩算法,支持的选项有none、gzip、snappy、lz4、zstd,默认为none

开启压缩后,可以节约网络带宽与Broker存储空间,但是会增加Producer与Broker的CPU消耗。此外,由于压缩是以Batch维度进行的,更好的攒批(更大的 Batch)会带来更好的压缩效果

在实现消息压缩时,会存在这样一个矛盾:只有在真正将消息压缩到Batch中之后,才能判断它实际(压缩后)占用了多大的大小;但为了不超过batch.size的限制,需要在消息写入Batch之前就判断其压缩后的大小。

为了解决这个问题,Kafka提出了一个自适应的压缩率估计算法:

  1. 维护一个Map,其中记录了每个Topic上各个压缩算法的“估计压缩率”,初始值为1.0

  2. 当某个Batch写满并压缩完成后,计算其实际压缩率(压缩后大小 / 压缩后大小)

  3. 基于这个实际压缩率调整估计压缩率

  • 如果实际压缩率 < 估计压缩率,将估计压缩率向实际压缩率靠近,最大减少0.005
  • 如果实际压缩率 > 估计压缩率,将估计压缩率向实际压缩率靠近,最大增加 0.05
  1. 在尝试向新的Batch写入消息时,将使用新的估计压缩率 * 1.05 作为估算值
Batch分裂

为了应对极端情况(消息可压缩性波动导致估计值大幅偏离实际值),Kafka支持了Batch分裂的逻辑。当压缩率估计值大幅低于实际值时,可能会导致在一个Batch中写入了过多的消息以至于超出了Broker或Topic的限制(message.max.bytes 或 max.message.bytes)

当发生这样的问题时,就需要Producer将过大的Batch拆分开并重新发送,具体流程如下:

  • Producer收到MESSAGE_TOO_LARGE报错

  • 重置前文中提到的估计压缩率至max(1.0, 该过大Batch的实际压缩率)

  • 将该Batch解压,并将解压出的消息基于batch.size重新攒批(由于重置了估计压缩率,这会产生多个 Batch),并重新加入发送队列

  • 如果开启了幂等性或事务性,那么为新的多个Batch设置SEQ

  • 释放老的Batch所使用的内存

监控指标

1
2
3
4
5
6
7
8
9
10
11
12
13
Producer暴露的metrics
batch-size-avg、batch-size-max 每个Batch的大小,如果开启了消息压缩,则为压缩后大小
batch-split-rate、batch-split-total Batch分裂的频率与次数
bufferpool-wait-time-ns-total 从Buffer Pool中等待分配内存的耗时
buffer-exhausted-rate、buffer-exhausted-total 从Buffer Pool中分配内存超时的频率与次数
compression-rate-avg Batch的平均压缩率
node-{node}.latency 指定Node响应Produce请求的延时(从发送请求到收到响应),包括成功与失败的所有请求
record-error-rate、record-error-total 发送消息(而非Batch)失败的频率与数量,包括同步调用阶段失败与异步调用阶段失败
record-queue-time-avg、record-queue-time-max Batch从创建到发送等待的耗时
record-send-rate、record-send-total 发送消息的频率和数量
record-size-avg、record-size-max 每个Batch中最大的消息(压缩前)的平均大小与最大大小
records-per-request-avg 每个Produce请求中消息的数量
request-latency-avg、request-latency-max Broker响应Produce请求的延时(从发送请求到收到响应),包括成功与失败的所有请求

常见问题

发送超时

Producer发送超时的可能原因有很多,例如网络问题、Broker负载过高等

  • Callback耗时过长,Producer支持在发送消息时注册回调,但该回调会在Producer的sender线程中执行,如果用户编写的回调方法执行了一些耗时操作,阻塞了sender线程的话,会导致该Producer的其它消息无法被及时发送,进而超时

  • Callback死锁,在Callback中同步调用send方法会导致死锁。比如在Callback方法中检查是否发生错误,如果发生错误则调用prdocuer.send().get()

发送线程被阻塞

尽管Producer在发送消息时是异步的,但仍有一小部分操作是同步执行的。当这些同步操作出于某些原因被阻塞时,会导致调用KafkaProducer#send方法的线程也被阻塞

  • 刷新Metadata超时,Producer在发送消息前可能需要请求Broker刷新Topic元数据,该操作会在send的同步阶段执行。如果Broker出于某些原因无法提供服务或响应超时,会导致Producer被阻塞直至超时

  • Producer Buffer满,当Producer发送消息的速率过快以至于超过Broker的处理能力,或被Broker限流时,未被发送的消息会积攒在内存Buffer Pool中。当Producer Buffer被耗尽时,send方法将被阻塞,直至出现可用Buffer或超时


AutoMq原文: https://mp.weixin.qq.com/s/NSCmB77HPNsaMm4Y_TuV-g