新的kafka版本(0.11)支持一次语义。
https://cwiki.apache.org/confluence/display/kafka/kip-98+-+exactly+once+delivery+and+transactional+messaging
我有一个生产者的设置与Kafka事务代码在java中像这样。
producer.initTransactions();
try {
producer.beginTransaction();
for (ProducerRecord<String, String> record : payload) {
producer.send(record);
}
Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() {
{
put(new TopicPartition(TOPIC, 0), new OffsetAndMetadata(42L, null));
}
};
producer.sendOffsetsToTransaction(groupCommit, "groupId");
producer.commitTransaction();
} catch (ProducerFencedException e) {
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
}
我不太清楚如何使用sendoffsetstotransaction和它的预期用例。另外,消费组是消费端的多线程读取功能。
javadoc说
“将消耗的偏移量列表发送给使用者组协调器,并将这些偏移量标记为当前事务的一部分。只有在事务提交成功的情况下,才会将这些偏移量视为已使用。当您需要将已使用的消息和已生成的消息一起批处理时,应该使用此方法,通常是在“使用-转换-生成”模式中
生产部门如何维护已消耗补偿的列表?这有什么意义?
1条答案
按热度按时间u1ehiz5o1#
这只与您正在使用的工作流相关,然后根据您使用的内容生成消息。此函数允许您仅在下游生成成功时提交消耗的偏移量。如果您使用数据,以某种方式对其进行处理,然后生成结果,这将在整个使用/生产过程中启用事务性保证。
如果没有事务,通常使用
Consumer#commitSync()
或者Consumer#commitAsync()
提交消费者补偿。但是,如果您在使用生产者制作之前使用这些方法,那么在知道生产者是否成功发送之前,您就已经提交了偏移量。因此,您可以使用
Producer#sendOffsetsToTransaction()
而不是由生产者提交偏移量。这会将偏移发送到处理事务的事务管理器。只有当整个事务消费和生产成功时,它才会提交偏移量。(注意:发送要提交的偏移量时,应在上次读取的偏移量中添加1,以便将来的读取从尚未读取的偏移量恢复。无论您是向消费者还是生产者承诺,这都是正确的。请参阅:kafkaproducer sendfoffsetstotransaction need offset+1以成功提交当前偏移量)。