我知道Kafka的声明,Kafka消费是线程安全。
所以我这样做了:(斯卡拉)
val m = Map(new TopicPartition(msg.topic(), msg.partition()) -> new OffsetAndMetadata(msg.offset()))
consumer.synchronized{ consumer.commitSync(m) }
我将对consumer的访问放在一个synchronized块中,但在与consumer.commitsync(m)的行中仍然出现concurrentmodificationexception错误。
为什么,我能怎么办?
我使用的是akka流,所以一定会有线程的秘密,但是同步块不应该处理这个吗?
1条答案
按热度按时间qvsjd97n1#
文档中的一种方法是为kafkaconsumer创建一个单独的线程,并使用某种并发队列与外部工作进行通信。