Apache Kafka(v3)- KTable的行为像KStream?

whhtz7ly  于 2022-12-03  发布在  Apache
关注(0)|答案(1)|浏览(125)

我是Apache Kafka的新手,我的理解是,当使用KTable时,你只能得到一个记录的最后一次更新。但是,我会得到所有匹配的记录。
请考虑下面的示例代码。
添加3条记录

producer.send(
    new ProducerRecord<>(
        topicName, 
        "http://www.simpsons.com", 
        "one"));
    
producer.send(
    new ProducerRecord<>(
        topicName, 
        "http://www.simpsons.com", 
        "two"));
        
producer.send(
    new ProducerRecord<>(
        topicName, 
        "http://www.familyguy.com", 
        "three"));

尝试使用键http://www.simpsons.com查找记录的最后记录信息

Properties props = new Properties();
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.APPLICATION_ID_CONFIG, String.format("%s", uuid));
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, connectionString);
       
StreamsBuilder builder = new StreamsBuilder();
// ** create the KTable instance **
KTable<String, String> myTable = builder.table(topicName, Consumed.with(AutoOffsetReset.EARLIEST));

myTable.toStream()
    .filter((k,v) -> k.equalsIgnoreCase("http://www.simpsons.com"))
    .print(Printed.<String, String>toSysOut()
    .withLabel("KTable"));

KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), new StreamsConfig(props));

kafkaStreams.cleanUp();
kafkaStreams.start();

Thread.sleep(5000);

kafkaStreams.close();

这是输出

[KTable]: http://www.simpsons.com, one
[KTable]: http://www.simpsons.com, two

这里我肯定遗漏了一些基本的东西。我的期望是这应该只打印出一条记录。http://www.simpsons.com, two
任何指导都将不胜感激。:-)

hivapdat

hivapdat1#

看起来问题是您没有使用正确的配置创建KTable。默认情况下,KTable是使用实体化示例创建的,该示例指定KTable应该使用默认的serdes键和值实体化到本地状态存储。
在您的代码中,您使用builder.table(topicName, Consumed.with(AutoOffsetReset.EARLIEST))创建KTable,它没有为KTable指定任何实体化。这意味着KTable不会被实体化到本地状态存储中,因此它的行为就像一个常规的Kafka流,并返回所有匹配的记录。
要解决这个问题,您需要在创建KTable时,指定KTable应该使用实体化示例实体化到本地状态存储。下面是一个例子,说明如何实现这一点:

// Create a Materialized instance that specifies that the KTable should be materialized to a local state store
// using the default serdes for the key and value
Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("my-store");

// Use the Materialized instance when creating the KTable
KTable<String, String> myTable = builder.table(topicName, Consumed.with(Serdes.String(), Serdes.String()), materialized);

一旦你这样做了,KTable将被实体化到一个本地状态存储中,并且你应该只在查询记录时得到最后一次更新,正如你所期望的那样。

相关问题