我是Apache Kafka的新手,我的理解是,当使用KTable时,你只能得到一个记录的最后一次更新。但是,我会得到所有匹配的记录。
请考虑下面的示例代码。
添加3条记录
producer.send(
new ProducerRecord<>(
topicName,
"http://www.simpsons.com",
"one"));
producer.send(
new ProducerRecord<>(
topicName,
"http://www.simpsons.com",
"two"));
producer.send(
new ProducerRecord<>(
topicName,
"http://www.familyguy.com",
"three"));
尝试使用键http://www.simpsons.com
查找记录的最后记录信息
Properties props = new Properties();
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.APPLICATION_ID_CONFIG, String.format("%s", uuid));
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, connectionString);
StreamsBuilder builder = new StreamsBuilder();
// ** create the KTable instance **
KTable<String, String> myTable = builder.table(topicName, Consumed.with(AutoOffsetReset.EARLIEST));
myTable.toStream()
.filter((k,v) -> k.equalsIgnoreCase("http://www.simpsons.com"))
.print(Printed.<String, String>toSysOut()
.withLabel("KTable"));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), new StreamsConfig(props));
kafkaStreams.cleanUp();
kafkaStreams.start();
Thread.sleep(5000);
kafkaStreams.close();
这是输出
[KTable]: http://www.simpsons.com, one
[KTable]: http://www.simpsons.com, two
这里我肯定遗漏了一些基本的东西。我的期望是这应该只打印出一条记录。http://www.simpsons.com, two
任何指导都将不胜感激。:-)
1条答案
按热度按时间hivapdat1#
看起来问题是您没有使用正确的配置创建KTable。默认情况下,KTable是使用实体化示例创建的,该示例指定KTable应该使用默认的serdes键和值实体化到本地状态存储。
在您的代码中,您使用
builder.table(topicName, Consumed.with(AutoOffsetReset.EARLIEST))
创建KTable,它没有为KTable指定任何实体化。这意味着KTable不会被实体化到本地状态存储中,因此它的行为就像一个常规的Kafka流,并返回所有匹配的记录。要解决这个问题,您需要在创建KTable时,指定KTable应该使用实体化示例实体化到本地状态存储。下面是一个例子,说明如何实现这一点:
一旦你这样做了,KTable将被实体化到一个本地状态存储中,并且你应该只在查询记录时得到最后一次更新,正如你所期望的那样。