javakafka使用者在多线程中的使用

ig9co6j1  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(687)

我在考虑在线程池中使用kafka消费者。我想出了这个办法。现在它似乎工作良好,但我正在考虑的缺点和什么问题,这种方法可以带来。基本上,我需要的是将记录处理与消费分离开来。另外,我需要有一个强有力的保证,即只有在处理完所有记录之后才进行提交。有人能就如何做得更好提出建议吗?

final var consumer = new KafkaConsumer<String, String>(props);
    consumer.subscribe(topics);
    final var threadPool = Executors.newFixedThreadPool(32);

    while(true) {

        ConsumerRecords<String, String> records;

        synchronized (consumer) {
            records = consumer.poll(Duration.ofMillis(100));
        }

        CompletableFuture.runAsync(this::processTask, threadPool).thenRun(() -> {
            synchronized (consumer) {
                consumer.commitSync();
            }
        });
    }
i34xakig

i34xakig1#

我看到了下面这篇文章,它将Kafka中记录的消费和处理解耦。你可以通过打电话来实现 poll() 方法并借助 pause() 以及 resume() 方法。
在多线程env中处理kafka记录

igetnqfo

igetnqfo2#

发行

此解决方案对于所述要求不可靠:
另外,我需要有一个强有力的保证,即只有在处理完所有记录之后才进行提交
脚本:
poll读取100条记录,开始异步处理
poll读取5条记录,开始异步处理
5条记录的处理将立即进行,并且在100条记录的处理仍在进行中时,消费者提交已完成
消费者崩溃
当再次提起消费者时,最后一次提交将对应于第105条记录。因此,它将开始处理第106条记录,我们错过了成功处理记录1-100的机会。
您只需要通过以下方式提交在该轮询中正在处理的偏移:

void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);

此外,还需要保证排序,以便首先提交第一次轮询,然后提交第二次轮询,依此类推。这将相当复杂。

命题

我相信您正在尝试在消息处理中实现并发性。这可以通过更简单的解决方案来实现。增加max.poll.records以读取一个合适的批,将其分成更小的批,并以异步方式运行它们以实现并发性。完成所有批次后,向Kafka消费者承诺。

相关问题