kakfa使用者正在处理下一条消息,甚至在提交同一主题的第一条消息之前

r8uurelv  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(286)

我创建了一个连接器,将集合的所有插入/更新事件推送到同一主题(1个分区)。处理insert事件消息的使用者代码将比update事件花费一些时间。
这里的问题是消息不是基于消费顺序提交的。以下是Kafka消费者的步骤和行为。
在集合中插入一条记录,并在一秒钟内更新字段。
该主题现在有2条记录。一个用于插入事件(消息1),另一个用于更新事件(消息2)。
消息1已使用
消息2已使用
消息2已提交
消息1已提交
有什么方法可以让我等到消息1被提交,然后使用消息2。
消费者代码:

const startConsumer = async () => {

        // Creating a kafka consumer group.
        const kafkaConsumerGroup = new kafkaNode.ConsumerGroup(consumer_configs, topic_name);

        kafkaConsumerGroup.on("connect", () => {
            console.log("Consumer Group Connected Successfully");
        });

        // Listening for messages from kafka.
        kafkaConsumerGroup.on("message", async (message) => {

                //processor code
                const data = JSON.parse(message.value);
                response = await processData(JSON.parse(data.payload));

                //Commit
                kafkaConsumerGroup.commit((error, data) => {
                    if (error) {
                        console.log(error);
                    } else {
                        console.log('Commit success ');
                    }
                });
        });

        kafkaConsumerGroup.on("error", (error) => {
            console.log(error);
        });
    };

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题