无法让kafkaj同时读取消息

6za6bjd0  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(341)

我想让Kafka同时读信息,
我正在用两个分区运行我的主题:

Topic:Tokens_Activity_Sandbox   PartitionCount:2    ReplicationFactor:1 Configs:segment.bytes=1073741824
    Topic: Tokens_Activity_Sandbox  Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001
    Topic: Tokens_Activity_Sandbox  Partition: 1    Leader: 1001    Replicas: 1001  Isr: 1001

做了个小测试客户。
平行发送时 sleep 对于主题的命令(每个分区1个),客户机逐个读取它们,顺序地而不是并行地(一旦进入睡眠状态)。
我做错什么了?

const topic = 'Tokens_Activity_Sandbox';
const { Kafka } = require('kafkajs');
const util = require('util');
const sleep = util.promisify(setTimeout);
const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['192.168.0.135:9092'],
});

const groupId = 'test-group2333';
const admin = kafka.admin({
  retry: { retries: 0 },
});
const producer = kafka.producer({ allowAutoTopicCreation: false });

const consumer = kafka.consumer({
  allowAutoTopicCreation: false,
  groupId: groupId,
  retry: { retries: 1, restartOnFailure: async () => false },
});

if (process.env.CREATE === 'true') {
  (async () => {
    await admin.createTopics({
      topics: [
        {
          topic: topic,
          numPartitions: 2, // default: 1
        },
      ],
    });
  })();
}

process.on('SIGINT', async () => {
  // eslint-disable-next-line no-undef
  await consumer.disconnect();
});

const eachMessageF = async ({ topic, partition, message }) => {
  console.log('incoming message', {
    topic,
    partition,
    offset: message.offset,
    value: message.value.toString(),
  });

  if (message.value.toString() === 'sleep') {
    console.log('going to sleep');
    await sleep(5000);
    console.log('woke up from sleep');
  }
  console.log(`done processing ${message.value.toString()}`);
};

const run = async () => {
  // admin
  await admin.connect();
  await producer.connect();
  // Consuming
  await consumer.connect();
  await consumer.subscribe({ topic: topic, fromBeginning: false });
  await consumer
    .run({
      partitionsConsumedConcurrently: 2,
      eachMessage: async ({ topic, partition, message }) => {
        try {
          await eachMessageF({ topic, partition, message });
        } catch (e) {
          console.log('caught th error <3');
        }
      },
    })
    .catch((err) => {
      console.log('caught err in catch of consumer.run');
      throw err;
    });
};

run()

这将是运行的输出

incoming message {
  topic: 'Tokens_Activity_Sandbox',
  partition: 1,
  offset: '4',
  value: 'sleep'
}
going to sleep
woke up from sleep
done processing sleep
incoming message {
  topic: 'Tokens_Activity_Sandbox',
  partition: 0,
  offset: '6',
  value: 'sleep'
}
going to sleep
woke up from sleep
done processing sleep

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题