按客户端id筛选

vulvrdjw  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(347)

我使用kafka流来创建一个ktable,它只包含特定于client\ id的数据,而client\ id不是主题键。我对kafka streams很陌生,它看起来很简单,但我对社区中提供的很多非常好的例子有点困惑。
我正在尝试获取客户机id=0123456的inputtopic数据。在ksql中,下面的命令类似于:

CREATE STREAM TOPIC1_CLIENT1 AS
SELECT * FROM TOPIC1
WHERE client_id= '0123456'
EMIT CHANGES;

下面我试图重现同样的行为。有人能告诉我下面我做错了什么吗?它不像我想象的那样过滤。

final KStream<String, String> stream = builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde));
        final KTable<String, String> convertedTable = stream.filter((client_id,v) -> v.equals("0123456")).toTable(Materialized.as("stream-converted-to-table"));
        stream.to(streamsOutputTopic, Produced.with(stringSerde, stringSerde));
        convertedTable.toStream().to(tableOutputTopic, Produced.with(stringSerde, stringSerde));
3bygqnnd

3bygqnnd1#

v 是消息的全部值。为了在ksql中具有命名字段,流上有一个关联的模式,例如,数据是json还是avro,这意味着clientid只是值的一部分

相关问题