chainedtransactionmanager在不同线程中调用aftercompletion

x4shl7ld  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(285)

我用的是 ChainedTransactionManagerKafkaTransactionManager 以及 JpaTransactionManager . 我用一个线程局部变量来保存一个id。
我有这样一个流程:
http call=>http filter(sets tl var)=>发送spring事件的逻辑=>transactionaleventlistener(phase=beforecommit)接收并发送给kafka(使用拦截器将tl添加为事件头并清除tl)
这是应该发生的,但这是正在发生的。
... => 发送spring事件=>另一个事件的逻辑获取线程,完成,清理tl=>返回到事件侦听器,接收事件,但此时tl已清除,kafka发布失败。
所有这些都发生在同一个线程中。尽管spring事件应该是同步的(除了 @Async ,我不知道) TransactionalEventListener 正在另一个线程中调用。
我所说的“另一个事件占用线程”是指当前使用者线程完成(尽管它没有真正完成,因为侦听器代码仍然丢失),并且另一个使用者使用同一个线程来使用另一个事件。当消耗完最后一个事件后,该线程返回到txneventlistener并清除tl。
这是异常堆栈跟踪:

at org.apache.kafka.clients.producer.internals.ProducerInterceptors.onSend(ProducerInterceptors.java:61)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:855)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:569)
    at io.opentracing.contrib.kafka.TracingKafkaProducer.send(TracingKafkaProducer.java:116)
    at io.opentracing.contrib.kafka.TracingKafkaProducer.send(TracingKafkaProducer.java:97)
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:404)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:222)
    at org.springframework.kafka.core.KafkaTemplate.sendDefault(KafkaTemplate.java:200)
    at revisions.messaging.publisher.implementation.RevisionsKafkaPublisher.sendDefault(RevisionsKafkaPublisher.kt:8)
    at revisions.messaging.publisher.implementation.RevisionsEventsPublisher.handle(RevisionsEventsPublisher.kt:17)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.springframework.context.event.ApplicationListenerMethodAdapter.doInvoke(ApplicationListenerMethodAdapter.java:300)
    at org.springframework.context.event.ApplicationListenerMethodAdapter.processEvent(ApplicationListenerMethodAdapter.java:190)
    at org.springframework.transaction.event.ApplicationListenerMethodTransactionalAdapter$TransactionSynchronizationEventAdapter.processEvent(ApplicationListenerMethodTransactionalAdapter.java:129)
    at org.springframework.transaction.event.ApplicationListenerMethodTransactionalAdapter$TransactionSynchronizationEventAdapter.afterCompletion(ApplicationListenerMethodTransactionalAdapter.java:118)
    at org.springframework.transaction.support.TransactionSynchronizationUtils.invokeAfterCompletion(TransactionSynchronizationUtils.java:171)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.invokeAfterCompletion(AbstractPlatformTransactionManager.java:990)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.triggerAfterCompletion(AbstractPlatformTransactionManager.java:965)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:786)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:712)
    at org.springframework.data.transaction.MultiTransactionStatus.commit(MultiTransactionStatus.java:74)
    at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:150)
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1418)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1398)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1165)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:949)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:884)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:745)

有没有办法避免打电话给 AfterCompletion 在另一条线上?事件处理是同步的,不是吗 @Async 或者类似的东西。我不需要多线程来消费一个请求/事件。
我认为事务同步和chainedtransactionmanager有些问题,但我不知道是什么。
谢谢!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题