我在Kafka使用事务。我已经为我的消费者容器提供了 ChainedKafkaTransactionManager
包括 JpaTransactionManager
以及 KafkaTransactionManager
.
我试图了解当消费者陷入困境并因此发送邮件时,交易是如何受到影响的 LeaveGroup
并禁用心跳线程。
我已经准备好了 max.poll.interval.ms
到10秒。
我没有改变 session.timeout.ms
. 默认值为10秒。
我有两个应用程序,每个应用程序有一个使用者。两个消费者都是交易型的。消费者a被操纵处理30秒,消费者b在1秒内处理它。两个使用者都从同一主题中读取内容,该主题包含3个分区。
把唱片寄给Kafka
消费者a收到记录。
消费者a开始处理记录。
消费者a处理时间超过 max.poll.interval.ms
消费者发送组和心跳停止。
Kafka重新平衡。所有分区现在都分配给使用者b。
消费者b收到相同的记录并对其进行处理。
消费者b提交交易。
消费者a现在已完成处理(30秒)。
消费者a提交交易。
Kafka重新平衡。分区被重新分配给两个使用者。
事务被处理并提交两次。两个消费者都应该是幂等的,以确保处理相同的记录不会产生任何后果。
我的假设是消费者a会抛出一个异常,原因是退出组并停止心跳。但事实并非如此。我用两个应用程序中唯一的事务id和相同的事务id对此进行了测试-结果相同。
为什么消费者在发送leavegroup后提交事务?
我不完全确定这是否是一个错误。不过,我已经将这种奇怪的行为或bug提交给了apachekafka
消费者a的日志
2018-07-11 11:59:22.365 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Received: 1 records
2018-07-11 11:59:22.366 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ntainer#0-0-C-1] o.s.k.t.KafkaTransactionManager : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2018-07-11 11:59:22.366 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ntainer#0-0-C-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, transactionalId=transactionId420] Transition from state READY to IN_TRANSACTION
2018-07-11 11:59:22.366 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ntainer#0-0-C-1] o.s.k.t.KafkaTransactionManager : Created Kafka transaction on producer [brave.kafka.clients.TracingProducer@631071b0]
2018-07-11 11:59:22.366 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ntainer#0-0-C-1] o.s.orm.jpa.JpaTransactionManager : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2018-07-11 11:59:22.366 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ntainer#0-0-C-1] o.s.orm.jpa.JpaTransactionManager : Opened new EntityManager [SessionImpl(PersistenceContext[entityKeys=[],collectionKeys=[]];ActionQueue[insertions=ExecutableList{size=0} updates=ExecutableList{size=0} deletions=ExecutableList{size=0} orphanRemovals=ExecutableList{size=0} collectionCreations=ExecutableList{size=0} collectionRemovals=ExecutableList{size=0} collectionUpdates=ExecutableList{size=0} collectionQueuedOps=ExecutableList{size=0} unresolvedInsertDependencies=null])] for JPA transaction
2018-07-11 11:59:22.366 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ntainer#0-0-C-1] o.h.e.t.internal.TransactionImpl : begin
2018-07-11 11:59:22.367 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ntainer#0-0-C-1] o.s.orm.jpa.JpaTransactionManager : Exposing JPA transaction as JDBC transaction [org.springframework.orm.jpa.vendor.HibernateJpaDialect$HibernateConnectionHandle@4eef56f0]
2018-07-11 11:59:22.427 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ntainer#0-0-C-1] l.a.BatchMessagingMessageListenerAdapter : Processing [GenericMessage [payload=[data], headers={kafka_offset=[34], kafka_consumer=brave.kafka.clients.TracingConsumer@1484643f, kafka_timestampType=[CREATE_TIME], kafka_receivedMessageKey=[null], kafka_receivedPartitionId=[1], kafka_receivedTopic=[trans-topic], kafka_receivedTimestamp=[1531299912221], kafka_batchConvertedHeaders=[{X-B3-SpanId=[B@1e664339, X-B3-ParentSpanId=[B@73f2c38a, X-B3-Sampled=[B@5f0ca155, X-B3-TraceId=[B@68ac877c}]}]]
...
2018-07-11 11:59:30.503 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [hread | mygrp42] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=mygrp42] Sending Heartbeat request to coordinator localhost:9092 (id: 2147483647 rack: null)
2018-07-11 11:59:30.608 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [hread | mygrp42] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=mygrp42] Received successful Heartbeat response
2018-07-11 11:59:32.256 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [hread | mygrp42] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=mygrp42] Sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null)
2018-07-11 11:59:32.256 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [hread | mygrp42] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=mygrp42] Disabling heartbeat thread
2018-07-11 11:59:37.458 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Sending offsets to transaction: {trans-topic-0=OffsetAndMetadata{offset=408, metadata=''}}
2018-07-11 11:59:37.465 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ntainer#0-0-C-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, transactionalId=transactionId420] Begin adding offsets {trans-topic-0=OffsetAndMetadata{offset=408, metadata=''}} for consumer group mygrp42 to transaction
2018-07-11 11:59:37.465 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ntainer#0-0-C-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, transactionalId=transactionId420] Enqueuing transactional request (type=AddOffsetsToTxnRequest, transactionalId=transactionId420, producerId=0, producerEpoch=64, consumerGroupId=mygrp42)
2018-07-11 11:59:37.465 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-1, transactionalId=transactionId420] Sending transactional request (type=AddOffsetsToTxnRequest, transactionalId=transactionId420, producerId=0, producerEpoch=64, consumerGroupId=mygrp42) to node localhost:9092 (id: 0 rack: null)
2018-07-11 11:59:37.467 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, transactionalId=transactionId420] Successfully added partition for consumer group mygrp42 to transaction
2018-07-11 11:59:37.467 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-1, transactionalId=transactionId420] Sending transactional request (type=TxnOffsetCommitRequest, transactionalId=transactionId420, producerId=0, producerEpoch=64, consumerGroupId=mygrp42, offsets={trans-topic-0=CommittedOffset(offset=408, metadata='')}) to node localhost:9092 (id: 0 rack: null)
2018-07-11 11:59:37.469 DEBUG [kafka-transaction-microservice-example,,,] 46299 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, transactionalId=transactionId420] Successfully added offsets {trans-topic-0=CommittedOffset(offset=408, metadata='')} from consumer group mygrp42 to transaction.
消费者b的日志
2018-07-11 11:59:33.777 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Received: 1 records
2018-07-11 11:59:33.777 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ntainer#0-0-C-1] o.s.k.t.KafkaTransactionManager : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2018-07-11 11:59:33.777 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ntainer#0-0-C-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-2, transactionalId=transactionId421] Transition from state READY to IN_TRANSACTION
2018-07-11 11:59:33.777 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ntainer#0-0-C-1] o.s.k.t.KafkaTransactionManager : Created Kafka transaction on producer [brave.kafka.clients.TracingProducer@30b6dff4]
2018-07-11 11:59:33.778 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ntainer#0-0-C-1] o.s.orm.jpa.JpaTransactionManager : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2018-07-11 11:59:33.778 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ntainer#0-0-C-1] o.s.orm.jpa.JpaTransactionManager : Opened new EntityManager [SessionImpl(PersistenceContext[entityKeys=[],collectionKeys=[]];ActionQueue[insertions=ExecutableList{size=0} updates=ExecutableList{size=0} deletions=ExecutableList{size=0} orphanRemovals=ExecutableList{size=0} collectionCreations=ExecutableList{size=0} collectionRemovals=ExecutableList{size=0} collectionUpdates=ExecutableList{size=0} collectionQueuedOps=ExecutableList{size=0} unresolvedInsertDependencies=null])] for JPA transaction
2018-07-11 11:59:33.778 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ntainer#0-0-C-1] o.h.e.t.internal.TransactionImpl : begin
2018-07-11 11:59:33.778 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ntainer#0-0-C-1] o.s.orm.jpa.JpaTransactionManager : Exposing JPA transaction as JDBC transaction [org.springframework.orm.jpa.vendor.HibernateJpaDialect$HibernateConnectionHandle@2b0eff5d]
2018-07-11 11:59:33.778 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ntainer#0-0-C-1] l.a.BatchMessagingMessageListenerAdapter : Processing [GenericMessage [payload=[data], headers={kafka_offset=[407], kafka_consumer=brave.kafka.clients.TracingConsumer@103b53ce, kafka_timestampType=[CREATE_TIME], kafka_receivedMessageKey=[null], kafka_receivedPartitionId=[0], kafka_receivedTopic=[trans-topic], kafka_receivedTimestamp=[1531299562360], kafka_batchConvertedHeaders=[{X-B3-SpanId=[B@38658f41, X-B3-ParentSpanId=[B@2faeb13a, X-B3-Sampled=[B@29e18244, X-B3-TraceId=[B@75496432}]}]]
2018-07-11 11:59:33.778 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Sending offsets to transaction: {trans-topic-0=OffsetAndMetadata{offset=408, metadata=''}}
2018-07-11 11:59:33.778 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ntainer#0-0-C-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-2, transactionalId=transactionId421] Begin adding offsets {trans-topic-0=OffsetAndMetadata{offset=408, metadata=''}} for consumer group mygrp42 to transaction
2018-07-11 11:59:33.778 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ntainer#0-0-C-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-2, transactionalId=transactionId421] Enqueuing transactional request (type=AddOffsetsToTxnRequest, transactionalId=transactionId421, producerId=1, producerEpoch=11, consumerGroupId=mygrp42)
2018-07-11 11:59:33.779 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ad | producer-2] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-2, transactionalId=transactionId421] Sending transactional request (type=AddOffsetsToTxnRequest, transactionalId=transactionId421, producerId=1, producerEpoch=11, consumerGroupId=mygrp42) to node localhost:9092 (id: 0 rack: null)
2018-07-11 11:59:33.780 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-2, transactionalId=transactionId421] Successfully added partition for consumer group mygrp42 to transaction
2018-07-11 11:59:33.780 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ad | producer-2] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-2, transactionalId=transactionId421] Sending transactional request (type=TxnOffsetCommitRequest, transactionalId=transactionId421, producerId=1, producerEpoch=11, consumerGroupId=mygrp42, offsets={trans-topic-0=CommittedOffset(offset=408, metadata='')}) to node localhost:9092 (id: 0 rack: null)
2018-07-11 11:59:33.781 DEBUG [kafka-transaction-microservice-example,,,] 46288 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-2, transactionalId=transactionId421] Successfully added offsets {trans-topic-0=CommittedOffset(offset=408, metadata='')} from consumer group mygrp42 to transaction.
暂无答案!
目前还没有任何答案,快来回答吧!