Spring Kafka使用AckMode手动确认,RECORD

scyqe7ek  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(165)

我正在尝试将Kafka用于下面的用例。
我们的生产者将产生消息到Kafka主题。我们的消费者天生就很慢,因为每个消息的处理都会根据消息的内容而有所不同。我们还根据一些条件对传入消息进行批处理,并一次性处理批处理消息列表。我们尝试了以下两种方法。
1.多分区:有了多个分区,我们可以用同一个消费者线程批处理消息,这运行良好,但是当我们在批处理中有一些较小的消息集,并且我们处理批处理的条件没有得到满足时,我们将等待新消息来处理已经批处理的现有消息。我们尝试了超时,但由于消费者线程在我们收到消息之前不会执行,因此我们无法处理最后一段消息。
1.一个分区-在内存队列中:我们尝试使用单个分区,将所有传入的消息写入内存中的并发队列,并且我们使用ExecutorService生成5个线程(假设)。所有线程都能够从队列中读取唯一的消息并处理它们。但问题在于承认。我们将确认沿着消息保存在队列中,并尝试在处理后确认每个消息。但是我们观察到的是,在内部队列进行部分处理之后,如果重新启动消费者,我不会看到其他消息返回给消费者。我们猜测这是因为我们正在做的ack。这里确认模式id为MANUAL/MANUAL_IMMEDIATE
代码如下:

@KafkaListener(topics = "#{'${kafka-consumer.topics}'.split(',')}", groupId = "${kafka-consumer.groupId}", concurrency = "1")
public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) throws InterruptedException {
        JSONObject jsonObject = new JSONObject(record.value());
        JSONArray request_data = jsonObject.getJSONArray("request");
        JSONObject data = request_data.getJSONObject(0);
        MessagePair messagePair = new MessagePair();
        messagePair.setMessage(data);
        messagePair.setAcknowledgment(ack);
        linkedQueue.add(messagePair);
        System.out.println("Queue size is : " + linkedQueue.size());

        if (executorService == null) {
            System.out.println("Starting executor service");
            executorService = Executors.newFixedThreadPool(2);
            for (int i = 0; i < 2; i++) {
                QueeuTask queeuTask = new QueeuTask(linkedQueue);
                System.out.println();
                executorService.execute(queeuTask);
            }
        }
}

下面的printList方法将由每个执行器线程在基于自定义条件创建批处理之后调用。我们的实际处理将在下面的方法中进行,处理后,我们将尝试确认每个消息。

public void Printlist(ArrayList<MessagePair> lst, String threadName) throws InterruptedException {
        //Process the messages
        for(MessagePair pair : lst) {
            System.out.println("Sending acknowledgement for message : " + pair.getMessage());
            pair.getAcknowledgment().acknowledge();
        }
        lst.clear();
}

通过上述实现,我们确实看到了本地队列中未处理的丢失消息。
有人可以请帮助我们如何才能实现这一承认发生在每一个消息。我们已经尝试使用AckMode.RECORD,它说

没有Acknowledgment可作为参数,侦听器容器必须有一个MATERIAL AckMode来填充Acknowledgment

感谢你的帮助。

eqqqjvef

eqqqjvef1#

在分析了用例之后,我们改变了设计,以确保Kafka的消费者不会慢下来。我们还处理偏移提交,以确保处理完成。在新的用例中,我们使用Kafka批处理侦听器在一次轮询中读取固定消息,并创建与实际流程相等的负载,并使用Kafka侦听器处理这些负载,这不是非常密集的处理。感谢所有的建议。

相关问题