我是kafka的新手,我安装了kafka的最新版本,并用0.9.0.1javaproducerapi成功地将一些消息发布到了一个主题“test”。这些消息可从命令行工具和其他第三方工具获得。然而,无论我尝试了什么,我都无法用新的高级消费api来读取这些消息。基本上我是从示例代码中复制代码的。
Properties props = new Properties();
props.put("bootstrap.servers", kafkaServer);
props.put("group.id", "testgroup");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(topics);
ConsumerRecords<String, String> records = consumer.poll(100);
这里总是返回一个空集。如果我调用set partitions=consumer.assignment();结果也是空的。我错过什么了吗?
另一个问题是,“group.id”是消费者必须的道具吗?如果删除“group.id”属性,则在轮询时会引发“invalid group id”异常。如果我们在某些情况下不需要一个消费群体呢?
暂无答案!
目前还没有任何答案,快来回答吧!