如何修复kafka中的concurrentmodificationexception错误(0.9.0.1)

koaltpgm  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(430)

我知道Kafka的声明,Kafka消费是线程安全。
所以我这样做了:(斯卡拉)

val m = Map(new TopicPartition(msg.topic(), msg.partition()) -> new OffsetAndMetadata(msg.offset()))
consumer.synchronized{ consumer.commitSync(m) }

我将对consumer的访问放在一个synchronized块中,但在与consumer.commitsync(m)的行中仍然出现concurrentmodificationexception错误。
为什么,我能怎么办?
我使用的是akka流,所以一定会有线程的秘密,但是同步块不应该处理这个吗?

qvsjd97n

qvsjd97n1#

文档中的一种方法是为kafkaconsumer创建一个单独的线程,并使用某种并发队列与外部工作进行通信。

相关问题