我需要从Kafka的主题中获得信息。例如,我的主题有一千条记录。我需要使用消息计数基础。计数为参数。即,如果我给予计数100,则只使用100条消息,下一个循环将使用接下来需要使用的100条记录。如何使用Kafka的作品。
cu6pst1q1#
我回答我自己的问题,它是帮助我做喜欢批处理.下面的代码示例消耗数据5秒,消耗的数据被推送到一个数组中,然后在进程完成后执行该进程恢复consumer
consumer
var { Kafka } = require('kafkajs') const kafka = new Kafka({ clientId: 'consumer', brokers: ['kafka:0'] }) const consumer = kafka.consumer({ groupId: 'test_topic_groups', allowAutoTopicCreation: true }) const run = async () => { // Consuming await consumer.connect() await consumer.subscribe({ topic: 'test-topic', fromBeginning: true }) var isPaused = false; var data = [] await consumer.run({ eachMessage: async ({ topic, partition, message }) => { data.push(message) if (!isPaused) { isPaused = true; setTimeout(async () => { consumer.pause([{ "topic": "test-topic" }]) doyourProcessandResume(data) console.log('Consumer paused...........') }, 500) } } }) function doyourProcessandResume(data) { // Do the process here , now i put timeout only then resume the consumer setTimeout(() => { isPaused = false consumer.resume([{ "topic": 'test-topic' }]); console.log('Consumer resumed..............') }, 2000) } } run().catch(console.error)
1条答案
按热度按时间cu6pst1q1#
我回答我自己的问题,它是帮助我做喜欢批处理.
下面的代码示例消耗数据5秒,消耗的数据被推送到一个数组中,然后在进程完成后执行该进程恢复
consumer