我们使用的处理器只传递一次(通过producer提交consumer offset),并且需要了解在使用kafka-cluster-1中某个主题的消息并生成kafka-cluster-2中某个主题时是否可能发生这种情况(反之亦然)。
这是事务处理器的一个片段:
messageProducer.beginTransaction(partitionId)
resultPublisher.publish(partitionId, resultTopic, messageRecord.key(), result)
val offsetAndMetadata = messageConsumer.getUncommittedOffsets(listenTopic, messageRecord)
messageProducer.sendOffsetsToTransaction(partitionId, offsetAndMetadata, consumerGroupId)
messageProducer.commitTransaction(partitionId)
我的理解是,生产者将尝试提交同一集群中消费者主题的偏移量。
我做了一些研究,但没有找到任何与多个集群相关的东西。
有可能吗?
1条答案
按热度按时间w9apscun1#
您可以“手动”将偏移发送到同一集群上的您自己的主题,在该集群中发送生成的消息。这样您就可以使用事务提供的担保。
您需要为偏移量创建自己的主题,类似于kafka的internal\uu consumer\u offset,其中,作为键,您应该使用groupid、topic、partition,并将最近的偏移量(已读或待读)作为值。记住使用原木压缩。
afaik不可能跨两个不同的集群进行事务处理。