kotlin Kafka Parallel Consumer没有在不同的进程之间拆分工作

zc0qhyus  于 2022-11-16  发布在  Kotlin
关注(0)|答案(1)|浏览(112)

我使用confluent parallel-consumer来实现快速写入不同的数据存储。我实现了我的代码,并且在本地使用dockers时一切都运行得很好。有一次我启动了几个主机,其中有几个消费者(具有相同的组ID)我注意到只有一个节点(processes)确实在消耗数据。我正在阅读的主题有24个分区,我有3个不同的节点,我预计Kafka会在它们之间划分工作。
以下是我的部分代码:

fun buildConsumer(config: KafkaConsumerConfig): KafkaConsumer<String, JsonObject> {
    val props = Properties()
    props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = config.kafkaBootstrapServers
    props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
    props[ConsumerConfig.GROUP_ID_CONFIG] = "myGroup"
    // Auto commit must be false in parallel consumer
    props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = false
    props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
    props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonObjectDeSerializer::class.java.name
    val consumer = KafkaConsumer<String, JsonObject>(props)
    
    return consumer
}

private fun createReactParallelConsumer(): ReactorProcessor<String, JsonObject> {
    val options = ParallelConsumerOptions.builder<String, JsonObject>()
        .ordering(ParallelConsumerOptions.ProcessingOrder.KEY)
        .maxConcurrency(10)
        .batchSize(1)
        .consumer(buildConsumer(kafkaConsumerConfig))
        .build()
    return ReactorProcessor(options)
}

还有我的主要代码:

pConsumer = createReactParallelConsumer()
    pConsumer.subscribe(UniLists.of(kafkaConsumerConfig.kafkaTopic))
    pConsumer.react { context ->
        batchProcessor.processBatch(context)
    }

如果您有任何建议,我们将不胜感激

laik7k3q

laik7k3q1#

我们遇到了一个在www.example.com https://github.com/confluentinc/parallel-consumer/issues/409版本中已关闭的问题0.5.2.4
Parallel客户端保留了旧的未完成的偏移,因为我们的消费者很慢(许多不同的原因),我们到达了保留的末尾(最早的策略),所以每次我们重新启动消费者时,它都在扫描所有那些不兼容的偏移(它没有截断它们- AKA bug)。修复只是将版本从www.example.com更新0.5.2.3到0.5.2.4

相关问题