Kafka Producer(下)
words: 3.9k views: time: 14min关于Kafka Producer的实现细节与常见问题
幂等性
在一个分布式消息系统中,各个角色均有可能发生故障。以Apache Kafka为例,Broker和Client都有可能会崩溃,Broker与Client之间的网络请求与响应都有可能丢失。
根据Producer处理这类故障时采取的策略,可以分为以下几种语义:
- 至少一次(At Least Once):当发生请求超时或者服务端错误时,Producer重复尝试发送消息直至成功。这样做可以保证每条消息都被写入Topic,但是可能会发生重复
- 至多一次(At Most Once):在超时或报错时Producer不进行重试,每条消息仅发送一次。这样做可以避免消息重复,但也可能会导致消息丢失
- 精确一次(Exactly Once):Producer进行适当的重试,以确保每条消息会且仅会被写入Topic 一次,既不重复,也不遗漏。Exactly Once的语义是最理想的实现,可以满足绝大多数业务场景的需求,但也是最难实现的,它需要Client与Broker之间的配合
开启幂等性
Kafka Producer开启幂等性只需要设置几个配置项,无需修改代码
相关配置
- acks:当指定数量的副本收到消息后,Producer才会认为消息写入完成,默认为all
acks=all:Producer会等待所有同步的(in sync)副本的响应
acks=1:Producer会等待leader broker的响应
acks=0:Producer不会等待任何broker的响应,消息写入网络层后即认为写入成功
- enable.idempotence:开启幂等性,保证每条消息写入且仅被写入一次,同时保证消息按照发送顺序写入,默认为true
开启此配置时,需要保证max.in.flight.requests.per.connection(在收到Broker响应ack之前,最多可以发送的未完成请求数量)不大于5,retries大于0,acks设置为all
Producer只能避免由自身重试策略(Producer、Broker 或网络出错)导致的消息重复,无法处理以下几种情况:
只保证会话级别的不重不漏,当Producer发生重启时,无法保证重启后与重启前发送的消息不重复
只保证Partition级别的不重不漏,不能保证向多个Partition发送的消息不重复
如果发送耗时超过了delivery.timeout.ms,Producer抛出TimeoutException,此时无法保证对应的消息是否已经被Broker持久化,需要上层根据情况进行处理
实现原理
为了实现幂等性,Kafka引入了以下两个概念:
- Producer ID(以下简称 PID):Producer的唯一标识。PID由Idempotent Producer在首次发送消息前,请求Broker分配获得,是全局唯一的。PID仅在Producer和Broker内部使用,不会暴露给Client使用者
- Sequence Number(以下简称 SEQ):消息的序列号。该序列号在(PID, Partition) 维度上严格递增。事实上SEQ会存储在Record Batch的头中,作为Batch中第一条消息的SEQ,Batch中其它消息的SEQ依次递增
另外还有Producer Epoch,它与PID结合才会唯一标识一个Producer,它的在不同的场景下有不同的用途:
对于开启事务能力的Producer(配置了transactional.id),Producer Epoch同样由Broker分配。这样做可以保证,多个具有相同Transactional ID的Producer中仅会有一个生效,即Fence Producer
对于没有开启事务能力的Producer,Producer Epoch则由Producer自己维护,它会在需要重置序列号(Reset SEQ)时增长,并将SEQ重置到0
对于PID与SEQ,均会跟随消息持久化到Log中
服务端
Broker会在内存中记录每个Producer的状态信息,包括Producer Epoch与每个Partition最新写入的5个Record Batch的元数据(包括SEQ、offset、timestamp等),用于判断Producer发送的请求是否存在重复或者遗漏
另外,这些状态信息也会定期进行快照,Broker在重启时会基于快照与Log中的信息恢复出这些状态信息(这里硬编码的5也是Producer配置 max.in.flight.requests.per.connection的上限)
当Broker收到一个Record Batch后,在进行完必要前置操作后、真正持久化到Log前,会检查该Batch上的PID、Producer Epoch与SEQ,具体地说:
检查该Record Batch是否与本地记录的5个Record Batch一致。若是,则认为Producer出于某些原因重复发送了该Record Batch,不进行任何操作,直接返回本地记录的元数据(主要是offset)
检查之前是否记录了该PID对应的状态信息,若没有,检查SEQ是否为0
- 若是,则认为这是一个全新的Producer,记录该Producer相关信息,并写入Record Batch
- 若否,则报错 UnknownProducerIdException
- 检查Producer Epoch是否与本地记录一致,若不一致,检查SEQ是否为0
- 若是,则认为该Producer出于某些原因重置了SEQ,更新记录,并写入Record Batch
- 若否,则报错OutOfOrderSequenceException
- 检查SEQ是否与最近一次写入的Record Batch的SEQ连续
- 若是,则缓存该Record Batch的元数据,并写入
- 若否,则报错OutOfOrderSequenceException
经过上述校验处理,可以确保在客户端侧,由同一个Producer向同一个Partition写入的Record Batch都是连续的(基于 SEQ),不会存在遗漏或重复
客户端
Producer对于幂等性的处理相对更加复杂,主要有以下两个难点:
Producer在发送时可能会发生超时,在超时时,可能存在两种可能:“Broker没有收到请求” 或 “Broker处理了请求,但Producer没有收到响应”。这就导致Producer难以确认当某个Produce请求超时时,Broker是否已经进行了持久化
Producer可能会向同一个Broker同时发送多个Produce请求,当其中一个或多个报错时,需要根据不同情况,对它们以及后续的请求采取不同的处理方式
基本概念
- 在途Batch(Inflight Batch)
Producer会按Partition维度,记录已经发送请求但尚未收到响应的Batch;特别地,对于幂等Producer,还会额外记录每个Inflight Batch的SEQ,并按照SEQ排序
- 未解决的Batch(Unresolved Batch)
Producer在发送消息时会进行数次重试,直至总耗时超出delivery.timeout.ms。如果某个Batch发生了Delivery Timeout,则认为其为Unresolved
当某个Batch被标记为Unresolved时,Producer无法判断Broker是否已经持久化这个Batch,只能通过检查这个Batch的后续Batch是否被Broker持久化(或报错OutOfOrderSequenceException):若后续Batch写入成功,则认为它之前的Unresolved Batch也已经写入完成;否则,则认为前面的Unresolved Batch没有写入完成,需要重置SEQ
- 提升Epoch(Bump Epoch)与重置SEQ(Reset Sequence Number)
当Producer遇到无法通过重试解决的问题时(例如Inflight Batch均响应完成,但仍存在Unresolved Batch时;或Broker报错UnknownProducerIdException时),会执行Bump Epoch & Reset SEQ的操作
具体会将Producer Epoch加一,并将出错Partition的所有Inflight Batch从零开始重新编号重新发送,并清空Unresolved Batch
发送流程
- 判断Unresolved Batches的状态
- 如果确认Unresolved Batch实际已写入完成,则将其从Unresolved Batches中移除
- 如果确认Unresolved Batch实际并没有写入(判断Inflight Batches是否为空),则Bump Epoch & Reset SEQ
- 检查目前该Partition能否发送新的Batch,不能发送的场景有:
- 存在Unresolved Batch
- 之前发生了Bump Epoch,且仍存在老的Epoch的Inflight Batch
- 之前某个Batch正在重试(也就是说,幂等Producer在重试时,Inflight始终为1)
如果之前发生了Bump Epoch,且已经不存在老Epoch的Inflight Batch,则Reset SEQ
获取对应Partition的下一个SEQ,并设置到Batch中
将Batch加入到Inflight Batches中
检查是否存在Delivery Timeout的Batch,若存在,则将其加入到Unresolved Batches中
向Broker发送Produce请求,等待响应
收到响应后,检查Error Code
- 若为不可重试错误(例如AuthorizationException),则Bump Epoch & Reset SEQ,并向上层报错
- 若为可重试错误(例如TimeoutException),则加入重试队列,等待下次发送。
如果报错为UnknownProducerIdException,且之前没有Reset SEQ,则Bump Epoch & Reset SEQ并重试;否则直接重试
如果报错为OutOfOrderSequenceException,且Unresolved Batch为空,或该Batch恰好为SEQ最大的Unresolved Batch的下一个,则Bump Epoch & Reset SEQ并重试;否则直接重试
- 从Inflight Batches中移除,并向上层返回成功
关于Inflight Request上限
Producer的配置max.in.flight.requests.per.connection存在上限5,这同时也是Broker缓存每个PID在每个Partition发送过的最新的Batch的数量。这样做的原因是,当Inflight Request数量超过Broker缓存的Batch数量时(假设 1),存在以下反例
Producer向Broker先后发送了两个Produce Request,且这两个请求中,均包含一个发送给Partition p1的Batch,记为b1与b2,其中b1 SEQ < b2 SEQ
Broker将b1 与 b2依次持久化完成(此时Broker缓存中会记录b2的元数据),但由于网络问题,Producer没有收到响应
Producer发现超时后重试,重新发送包含b1的Produce Request
Broker收到Request后发现b1 SEQ小于缓存中的b2 SEQ,可以推测出该消息为重复的,不应写入,而是直接返回offset等信息;但由于缓存中并没有b1相关元数据,Broker也就无法返回offset信息
其他细节
- Producer Epoch 溢出处理
当Producer Epoch溢出时(short最大为32767),Producer会将PID与Epoch重置,并向Broker请求分配一个新的PID与Epoch,并Reset SEQ
- SEQ 溢出处理
当SEQ溢出时(int最大为2147483647),下一条消息的SEQ会轮转回0。考虑到Inflight Batch的数量与Batch中消息的数量的限制,不会发生问题
- UnknownProducerIdException 处理
通常的报错场景:由于Log Retention限制,Broker将Log中某个Producer发送的消息均删除了,此时Broker重启,缓存中不再有该Producer的状态信息。如果此时Producer尝试接着之前的SEQ发送消息,由于Broker无法识别PID,则会报错。这种情况,Producer只需Bump Epoch并 Reset SEQ,重新发送消息即可
场景示例
Broker没有收到请求
Producer没有收到响应
实现细节
消息压缩
Kafka Producer支持在客户端对消息进行压缩,以减少消息的网络传输成本与存储成本。可以通过Producer配置compression.type来指定压缩算法,支持的选项有none、gzip、snappy、lz4、zstd,默认为none
开启压缩后,可以节约网络带宽与Broker存储空间,但是会增加Producer与Broker的CPU消耗。此外,由于压缩是以Batch维度进行的,更好的攒批(更大的 Batch)会带来更好的压缩效果
在实现消息压缩时,会存在这样一个矛盾:只有在真正将消息压缩到Batch中之后,才能判断它实际(压缩后)占用了多大的大小;但为了不超过batch.size的限制,需要在消息写入Batch之前就判断其压缩后的大小。
为了解决这个问题,Kafka提出了一个自适应的压缩率估计算法:
维护一个Map,其中记录了每个Topic上各个压缩算法的“估计压缩率”,初始值为1.0
当某个Batch写满并压缩完成后,计算其实际压缩率(压缩后大小 / 压缩后大小)
基于这个实际压缩率调整估计压缩率
- 如果实际压缩率 < 估计压缩率,将估计压缩率向实际压缩率靠近,最大减少0.005
- 如果实际压缩率 > 估计压缩率,将估计压缩率向实际压缩率靠近,最大增加 0.05
- 在尝试向新的Batch写入消息时,将使用新的估计压缩率 * 1.05 作为估算值
Batch分裂
为了应对极端情况(消息可压缩性波动导致估计值大幅偏离实际值),Kafka支持了Batch分裂的逻辑。当压缩率估计值大幅低于实际值时,可能会导致在一个Batch中写入了过多的消息以至于超出了Broker或Topic的限制(message.max.bytes 或 max.message.bytes)
当发生这样的问题时,就需要Producer将过大的Batch拆分开并重新发送,具体流程如下:
Producer收到MESSAGE_TOO_LARGE报错
重置前文中提到的估计压缩率至max(1.0, 该过大Batch的实际压缩率)
将该Batch解压,并将解压出的消息基于batch.size重新攒批(由于重置了估计压缩率,这会产生多个 Batch),并重新加入发送队列
如果开启了幂等性或事务性,那么为新的多个Batch设置SEQ
释放老的Batch所使用的内存
监控指标
1 | Producer暴露的metrics |
常见问题
发送超时
Producer发送超时的可能原因有很多,例如网络问题、Broker负载过高等
Callback耗时过长,Producer支持在发送消息时注册回调,但该回调会在Producer的sender线程中执行,如果用户编写的回调方法执行了一些耗时操作,阻塞了sender线程的话,会导致该Producer的其它消息无法被及时发送,进而超时
Callback死锁,在Callback中同步调用send方法会导致死锁。比如在Callback方法中检查是否发生错误,如果发生错误则调用prdocuer.send().get()
发送线程被阻塞
尽管Producer在发送消息时是异步的,但仍有一小部分操作是同步执行的。当这些同步操作出于某些原因被阻塞时,会导致调用KafkaProducer#send方法的线程也被阻塞
刷新Metadata超时,Producer在发送消息前可能需要请求Broker刷新Topic元数据,该操作会在send的同步阶段执行。如果Broker出于某些原因无法提供服务或响应超时,会导致Producer被阻塞直至超时
Producer Buffer满,当Producer发送消息的速率过快以至于超过Broker的处理能力,或被Broker限流时,未被发送的消息会积攒在内存Buffer Pool中。当Producer Buffer被耗尽时,send方法将被阻塞,直至出现可用Buffer或超时