我使用kafka流来创建一个ktable,它只包含特定于client\ id的数据,而client\ id不是主题键。我对kafka streams很陌生,它看起来很简单,但我对社区中提供的很多非常好的例子有点困惑。
我正在尝试获取客户机id=0123456的inputtopic数据。在ksql中,下面的命令类似于:
CREATE STREAM TOPIC1_CLIENT1 AS
SELECT * FROM TOPIC1
WHERE client_id= '0123456'
EMIT CHANGES;
下面我试图重现同样的行为。有人能告诉我下面我做错了什么吗?它不像我想象的那样过滤。
final KStream<String, String> stream = builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde));
final KTable<String, String> convertedTable = stream.filter((client_id,v) -> v.equals("0123456")).toTable(Materialized.as("stream-converted-to-table"));
stream.to(streamsOutputTopic, Produced.with(stringSerde, stringSerde));
convertedTable.toStream().to(tableOutputTopic, Produced.with(stringSerde, stringSerde));
1条答案
按热度按时间3bygqnnd1#
v
是消息的全部值。为了在ksql中具有命名字段,流上有一个关联的模式,例如,数据是json还是avro,这意味着clientid只是值的一部分