我是一个学习Kafka的新生,在了解多个消费者方面遇到了一些基本问题,到目前为止,文章、文档等都没有太大帮助。
我试着做的一件事是编写自己的高级Kafka生产者和消费者,并同时运行它们,向一个主题发布100条简单消息,让消费者检索它们。我已经成功地做到了这一点,但是当我尝试引入第二个使用者来使用刚刚发布的消息所指向的同一主题时,它没有收到任何消息。
我的理解是,对于每一个主题,你可以有来自不同消费群体的消费者,这些消费者群体中的每一个都会得到某个主题的消息的完整副本。是这样吗?如果没有,我应该怎样设置多个消费者?这是我到目前为止写的消费者类:
public class AlternateConsumer extends Thread {
private final KafkaConsumer<Integer, String> consumer;
private final String topic;
private final Boolean isAsync = false;
public AlternateConsumer(String topic, String consumerGroup) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", consumerGroup);
properties.put("partition.assignment.strategy", "roundrobin");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<Integer, String>(properties);
consumer.subscribe(topic);
this.topic = topic;
}
public void run() {
while (true) {
ConsumerRecords<Integer, String> records = consumer.poll(0);
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("We received message: " + record.value() + " from topic: " + record.topic());
}
}
}
}
此外,我注意到,最初我是针对一个只有一个分区的主题“test”测试上述消耗。当我将另一个消费者添加到一个现有的消费者组(比如“testgroup”)时,这触发了kafka重新平衡,它显著地降低了我消费的延迟,以秒为单位。我认为这是一个重新平衡的问题,因为我只有一个分区,但是当我创建了一个新的主题“multiplepartitions”,比如说6个分区时,出现了类似的问题,将更多的使用者添加到同一个使用者组会导致延迟问题。我环顾四周,人们告诉我,我应该使用多线程消费者——有人能解释一下吗?
3条答案
按热度按时间nwsw7zdq1#
如果希望多个使用者使用相同的消息(如广播),可以使用不同的使用者组生成它们,还可以在使用者配置中将auto.offset.reset设置为最小。如果希望多个使用者并行完成消费(在它们之间分配工作),则应创建number of partitions>=number of consumers。一个分区最多只能由一个使用者进程使用。但是一个使用者可以使用多个分区。
bwntbbo32#
我认为您的问题在于auto.offset.reset属性。当一个新的使用者从一个分区中读取数据并且没有以前提交的偏移量时,auto.offset.reset属性用于决定起始偏移量应该是什么。如果将其设置为“最大”(默认值),则从最新(最后)消息开始读取。如果将其设置为“最小”,则会收到第一条可用消息。
所以加上:
再试一次。
“最小的”和“最大的”不久前被弃用了。你现在应该用“最早的”或“最新的”。有什么问题,查一下文件
rkue9o1l3#
在这里的文档中,它说:“如果您提供的线程多于主题上的分区,那么某些线程将永远看不到消息”。你能给你的主题添加分区吗?我的使用者组线程数等于主题中的分区数,每个线程都在接收消息。
以下是我的主题配置:
我的消费者: