我想知道我能不能得到一些帮助来理解Kafka中的事务,尤其是如何使用transaction.id。上下文如下:
我的kafka应用程序遵循以下模式:使用来自输入主题的消息、处理、发布到输出主题。
我使用的不是kafka流api。
我在一个消费者组中有多个消费者,每个消费者都在自己的轮询线程中。
有一个线程池,其中包含执行消息处理和发布到输出主题的工作线程。目前,每个线程都有自己的producer示例。
我使用PublishedTransactionsAPI来确保消费偏移量的更新和输出主题的发布以原子方式进行
迄今为止,我的假设包括:
如果我的进程在事务中崩溃,那么该事务中的任何内容都不会发布,也不会移动消耗偏移量。因此,在重新启动时,我只需从原始的消费偏移量重新启动事务。
对于producer transaction.id,重要的是它是唯一的。因此,我可以在启动时生成一个基于时间戳的id
然后我读了以下博客:https://www.confluent.io/blog/transactions-apache-kafka/. 特别是在“如何选择事务id”一节中,这似乎意味着我需要保证每个输入分区都有一个producer示例。它说“正确隔离僵尸的关键是确保读进程写周期中的输入主题和分区对于给定的transactional.id总是相同的”。它进一步引用问题示例如下:“例如,在分布式流处理应用程序中,假设主题分区tp0最初由transactional.id t0处理。如果在稍后的某个时刻,它可以Map到另一个transactional.id为t1的生产者,那么t0和t1之间就没有隔离。因此,tp0的消息有可能被重新处理,违反了“一次处理”的保证
我不太明白为什么会这样。在我看来,只要事务是原子的,我就不应该关心生产者如何处理来自任何分区的消息。我已经为此挣扎了一天,不知道是否有人能告诉我我错过了什么。那么,为什么我不能将工作分配给任何transaction.id设置为唯一的生产者示例呢。为什么他们会说,如果你这么做,消息可能会通过事务提供的围栏泄漏呢。
3条答案
按热度按时间kxkpmulp1#
当使用streams api(与常规kafka生产者不同)时,您不必担心设置唯一的
transactional.id
每个流应用程序示例。启用流时exactly_once
在语义上,streams api将基于主题/分区生成正确的/唯一的transactional.id。看看这里到底做了什么:https://github.com/axbaretto/kafka/blob/fe51708ade3cdf4fe9640c205c66e3dd1a110062/streams/src/main/java/org/apache/kafka/streams/processor/internals/streamthread.java#l455
任务(代码中的taskid)说明如下:https://docs.confluent.io/current/streams/architecture.html#stream-分区和任务
yzuktlbb2#
你提到的博客文章包含了你想要的所有信息,尽管它相当密集。
为什么交易?上述条款的一节。
使用为至少一次传递语义配置的vanilla kafka生产者和消费者,流处理应用程序可能会以以下方式完全丢失一次处理语义:
这个
producer.send()
由于内部重试,可能导致重复写入消息b。这是由幂等生产者解决的,而不是本文其余部分的重点。我们可能会重新处理输入消息a,导致重复的b消息被写入到输出中,这违反了只处理一次的语义。如果流处理应用程序在写入b之后但在将a标记为已使用之前崩溃,则可能会发生重新处理。因此,当它恢复时,它将再次消耗a并再次写入b,从而导致重复。
最后,在分布式环境中,应用程序将崩溃或更糟-暂时失去与系统其余部分的连接。通常,新示例会自动启动以替换那些被认为丢失的示例。通过这个过程,我们可能会有多个示例处理相同的输入主题,并写入相同的输出主题,从而导致重复的输出,并违反只处理一次的语义。我们称之为“僵尸示例”问题
来自同一篇文章中的事务语义部分。
僵尸围栏
我们通过要求为每个事务生产者分配一个称为
transactional.id
. 这用于在进程重新启动时标识同一生产者示例[[重点已添加]api要求事务生产者的第一个操作应该是显式注册其
transactional.id
Kafka星团。当它这样做时,Kafka经纪人会检查与给定客户的未结交易transactional.id
完成它们。它还会增加与transactional.id
. epoch是为每个transactional.id
.一旦这个时代被打破,任何生产商
transactional.id
一个旧的时代被认为是僵尸,被隔离,也就是说,来自那些生产者的未来事务写入被拒绝[[重点已添加]以及同一篇文章中的数据流部分。
答:生产者和交易协调人的互动
在执行事务时,生产者在以下几点向事务协调器发出请求:
这个
initTransactions
api寄存器atransactional.id
和协调员在一起。此时,协调器将关闭与该transactional.id
把僵尸赶出了新纪元。每个生产者会话只发生一次[[重点已添加]当生产者将要在事务中第一次向分区发送数据时,首先向协调器注册分区。
当应用程序调用
commitTransaction
或者abortTransaction
,请求被发送到协调器以开始两阶段提交协议。希望这有帮助!
insrf1ej3#
考虑这样一种情况:消费群体群体处于变动中(新的消费群体在线或离线),或者失败场景导致消费群体内主题分区分配的重新平衡。
现在假设一个消费者
C0
以前被分配了分区P0
. 这个使用者正在愉快地处理消息、发布新消息等(标准的consume-transform-publish模式)P0
不客气地(总是想用那个词)从C0
分配给C1
. 从C0
,它可能仍有大量积压的消息要处理,而且它对重新分配的消息一无所知。你最后的处境是C0
以及C1
在很短的一段时间内,他们可能会认为他们都是“拥有者”P0
并将相应地采取行动,在传出主题中创建重复的消息,更糟的是,这些重复的消息可能会出现无序。使用
transactional.id
启用原始博客所指的“围栏”。作为重新分配的一部分,新的生产商将按照增加的纪元编号行事,而现有的生产商仍将使用旧纪元。击剑是微不足道的;把信息放在时代已经过去的地方。Kafka的交易有几个小问题:
入站和出站主题必须位于同一集群上,事务才能正常工作。
命名
transactional.id
是至关重要的生产者'移交',即使你不关心僵尸击剑。新制作人的出现将促使清理失效制作人的任何孤立的飞行中事务,因此要求id在制作人会话中稳定/可重复。不要为此使用随机ID;这不仅会导致不完整的交易(这会阻碍市场中的每个消费者)READ_COMMITTED
模式),但它也会在事务协调器(在代理上运行)上累积额外的状态。默认情况下,此状态将持续7天,因此您不希望突发奇想而产生任意命名的事务生产者。理想的
transactional.id
反映入站主题和分区的组合(当然,除非只有一个分区主题。)实际上,这意味着为分配给使用者的每个分区创建一个新的事务生产者(请记住,在consume trasform发布场景中,生产者也是消费者,消费者分区分配将随每个重新平衡事件而变化。)请看spring kafka实现,它为每个入站分区创建一个新生产者(关于这种方法的安全性,以及是否应该在分区重新分配时清除生产者,还有一些需要说明的问题,但那是另一回事。)击剑机制只在Kafka级别上运作。换言之,它将失效的制片人与Kafka隔离开来,而不是与世界其他地方隔离开来。这意味着,如果生产者还必须更新某些外部状态(在数据库、缓存等中),作为使用转换发布周期的一部分,则应用程序有责任在分区重新分配时将自己与数据库隔离,或者以其他方式确保更新的幂等性。
为了完整起见,值得指出的是,这并不是实现击剑的唯一方法。kafka consumer api确实为用户提供了注册
ConsumerRebalanceListener
,这为被替换的使用者提供了在将分区重新分配给新使用者之前,最后一次排出任何未完成的积压工作(或清除它)的机会。回调阻塞;当它返回时,假设处理程序已经在本地隔离了自己;然后,也只有到那时,新的消费者才能恢复处理。