我使用confluent parallel-consumer来实现快速写入不同的数据存储。我实现了我的代码,并且在本地使用dockers时一切都运行得很好。有一次我启动了几个主机,其中有几个消费者(具有相同的组ID)我注意到只有一个节点(processes)确实在消耗数据。我正在阅读的主题有24个分区,我有3个不同的节点,我预计Kafka会在它们之间划分工作。
以下是我的部分代码:
fun buildConsumer(config: KafkaConsumerConfig): KafkaConsumer<String, JsonObject> {
val props = Properties()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = config.kafkaBootstrapServers
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
props[ConsumerConfig.GROUP_ID_CONFIG] = "myGroup"
// Auto commit must be false in parallel consumer
props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = false
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonObjectDeSerializer::class.java.name
val consumer = KafkaConsumer<String, JsonObject>(props)
return consumer
}
private fun createReactParallelConsumer(): ReactorProcessor<String, JsonObject> {
val options = ParallelConsumerOptions.builder<String, JsonObject>()
.ordering(ParallelConsumerOptions.ProcessingOrder.KEY)
.maxConcurrency(10)
.batchSize(1)
.consumer(buildConsumer(kafkaConsumerConfig))
.build()
return ReactorProcessor(options)
}
还有我的主要代码:
pConsumer = createReactParallelConsumer()
pConsumer.subscribe(UniLists.of(kafkaConsumerConfig.kafkaTopic))
pConsumer.react { context ->
batchProcessor.processBatch(context)
}
如果您有任何建议,我们将不胜感激
1条答案
按热度按时间laik7k3q1#
我们遇到了一个在www.example.com https://github.com/confluentinc/parallel-consumer/issues/409版本中已关闭的问题0.5.2.4
Parallel客户端保留了旧的未完成的偏移,因为我们的消费者很慢(许多不同的原因),我们到达了保留的末尾(最早的策略),所以每次我们重新启动消费者时,它都在扫描所有那些不兼容的偏移(它没有截断它们- AKA bug)。修复只是将版本从www.example.com更新0.5.2.3到0.5.2.4