如何使用kafkajs npm来消费来自Kafka主题计数基础或批处理模型的消息

xcitsw88  于 2022-12-17  发布在  Apache
关注(0)|答案(1)|浏览(122)

我需要从Kafka的主题中获得信息。
例如,我的主题有一千条记录。我需要使用消息计数基础。计数为参数。即,如果我给予计数100,则只使用100条消息,下一个循环将使用接下来需要使用的100条记录。
如何使用Kafka的作品。

cu6pst1q

cu6pst1q1#

我回答我自己的问题,它是帮助我做喜欢批处理.
下面的代码示例消耗数据5秒,消耗的数据被推送到一个数组中,然后在进程完成后执行该进程恢复consumer

var { Kafka } = require('kafkajs')
const kafka = new Kafka({
    clientId: 'consumer',
    brokers: ['kafka:0']
})
const consumer = kafka.consumer({ groupId: 'test_topic_groups', allowAutoTopicCreation: true })

const run = async () => {
    // Consuming
    await consumer.connect()
    await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
    var isPaused = false;
    var data = []
    await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
            data.push(message)
            if (!isPaused) {
                isPaused = true;
                setTimeout(async () => {
                    consumer.pause([{ "topic": "test-topic" }])
                    doyourProcessandResume(data)
                    console.log('Consumer paused...........')
                }, 500)
            }
        }
    })
    function doyourProcessandResume(data) {
        // Do the process here , now i put timeout only then resume the consumer
        setTimeout(() => {
            isPaused = false
            consumer.resume([{ "topic": 'test-topic' }]);
            console.log('Consumer resumed..............')
        }, 2000)
    }
}
run().catch(console.error)

相关问题