从恰好一次kip开始,关于应用程序重启时的生产者幂等性 InitPidRequest
:
2.1如果设置了transactional.id配置,则指定transactionalid时,该transactionalid将与initpidrequest一起传递,并且到相应pid的Map将记录在步骤2a的事务日志中。这使我们能够将transactionalid的相同pid返回给生产者的未来示例,从而可以恢复或中止以前不完整的事务。
除了返回pid之外,initpidrequest还执行以下任务:
将pid的epoch加起来,这样,生产者的任何前一个zombie示例都会被隔离,并且不能继续处理它的事务。
恢复(前滚或后滚)生产者的前一个示例留下的任何未完成的事务。initpidrequest的处理是同步的。一旦返回,生产者就可以发送数据并开始新的事务。
当生产者失败,重新启动和 InitPidRequest
执行时,最后一个事务是“前滚”(我猜这意味着已提交)还是“后滚”?这是如何控制的?
1条答案
按热度按时间lokaqttq1#
使kafka能够实现这一点的关键组件是事务协调器。这是你提到的kip的一部分。事务协调器由代理构造,作为初始化过程的一部分,并在内存中维护以下信息:
Map来自
TransactionalId
分配给PID
,当前epoch编号(unix timestamp)和事务超时值Map来自
PID
生产商当前正在进行的交易状态PID
,以及上次更新此状态的时间现在,回答您关于向前或向后滚动事务的问题:
当生产者失败并重新启动时,它会发送一个新的
InitPidRequest
如果生产者提供了非空的TransactionalId
(由producer应用程序作为配置参数提供)。事务协调器在接收到这个请求后,会检查是否已经有一个带有所提供的
TransactionalId
在内存Map中(上面的第1点)。如果存在Map,它将查找PID
在第二个内存Map(上面的第2点)中,检查是否有针对该Map的正在进行的事务PID
:如果有一个正在进行的事务处于启动状态,即。
BEGIN
,则事务将被中止(注意:这是回滚版本)如果有一个正在进行的事务已经启动并且处于
PREPARE_ABORT
或者PREPARE_COMMIT
,则事务协调器将等待事务通过COMPLETE_ABORT
(回滚版本)或COMPLETE_COMMIT
(前滚版本)。之后,事务协调器用最新的
PID
以及TransactionalId
然后生产者可以开始发送新的事务。我尽量把解释控制在最低限度,但如果你对更多细节感兴趣,那么这里有详细的设计文件供你参考。
我希望这有帮助!