阅读Kafka主题中的消息然后关闭的正确方式

ni65a41a  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(601)

我正在使用express和kafka节点构建一个简单的node.jsapi,当接收到http请求时,它将从请求的kafka主题和使用者组返回未读消息,然后关闭连接。我不需要或不希望消费者继续等待新消息。
在kafka节点中,检查是否已到达主题结尾的正确方法是什么?如果是,请关闭与代理的连接并退出应用程序,以防止读取新消息?
这是我的consumer.js。这与kafka节点文档中给出的示例基本相同。

"use strict";

const kafka = require("kafka-node");

let topicName = "testTopic-01",
  groupName = "testGroup-01",
  consumerOptions = {
    kafkaHost: "localhost: 9092",
    groupId: groupName,
    sessionTimeout: 15000,
    protocol: ["roundrobin"],
    fromOffset: "earliest",
    encoding: "utf8"
  };

const consumerGroup = new kafka.ConsumerGroup(consumerOptions, topicName);

consumerGroup.on("message", message => {
  console.log(`Message: ${message.value}`);
});
consumerGroup.on("error", error => {
  console.error(error);
});

console.log(`Consumer started on topic ${topicName} on group ${groupName}`);
thtygnil

thtygnil1#

您可以使用#offset获取主题分区的当前偏移量。通过比较所分配的主题分区的偏移量,就可以知道相应主题分区中的最后一条消息是什么。
请记住,如果您有多个并行的使用者,您应该跟踪使用者组中的使用者被分配到的主题分区(#fetchcommits)。

相关问题