我正在尝试用kafka实现一个多消费者-单个生产者的设置。我创建了一个包含3个分区的主题,如下所示:
./kafka-create-topic.sh --topic stream.main.out --zookeeper
localhost:2181 --partition 3
生产商设置如下:
props = new Properties();
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("broker.list","xxx.xxx.x:9092,.....");
props.put("request.required.acks", "1");
props.put("topic.metadata.refresh.interval.ms", "1");
props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
//props.put("enable.auto.cimmit","true");
//props.put("auto.commit.interval.ms","10");
现在,如果我启动第一个消费者,它运行良好;启动第二个消费者是问题所在。它启动了,但似乎是等待消息,没有发生任何事情超过10分钟。为什么会这样?起初我尝试在没有多个分区的情况下执行此操作,但后来每个使用者似乎都得到了相同的消息,即使使用相同的组id。
props = new Properties();
props.put("metadata.broker.list", "192.xxxx:9092,....");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("broker.list","192.1xxx:9092,192....");
props.put("request.required.acks", "1");
ConsumerConnector consumer =
kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig(zooKeeper, groupId));
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
for ( KafkaStream stream : streams) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext())
1条答案
按热度按时间ocebsuys1#
很可能,这个问题与
group.id
(组id)。如果您使用相同的group.id
对于两个使用者,他们将读取不同的分区集,即不同的消息集。如果您想使用来自两个使用者的所有消息,那么必须定义一个不同的
group.id
对于每个不同的消费者。根据Kafka官方文件:
使用者用使用者组名称标记自己,发布到主题的每条记录都会传递到每个订阅使用者组中的一个使用者示例。使用者示例可以在不同的进程中,也可以在不同的机器上。
如果所有使用者示例都具有相同的使用者组,则记录将有效地在使用者示例上进行负载平衡。
如果所有使用者示例都有不同的使用者组,则每个记录都将广播给所有使用者进程。
如果你用不同的
group.id
对于每个使用者,如果您仍然无法读取来自双方的消息,您可以尝试运行控制台使用者,并确保您创建的使用者类没有问题。