我们需要听多个主题,并在每个主题的事件中寻找特定的领域。每个主题事件都是json格式的,并且json格式的固定字段很少。需要从所有这些多个主题中筛选事件,并在每个事件负载中查找特定字段。如果此字段值与特定格式匹配,则将这些事件从不同主题发送到一个固定主题,该主题可由另一个使用者进一步处理。
我们正在寻找ksql是否能在这个场景中提供帮助—我们从多个主题创建一个流,并根据ksql流中的固定列过滤数据,然后将其推送到新主题。我的问题是:1)是否可以从多个主题创建ksql流?2) 是否可以将主题的完整事件负载作为ksql流中的一列获取?
在高层次上(使用错误的ksql语法),我正在寻找
CREATE STREAM my_all_topics (myFixedFiedl1 varchar, eventPayload varchar) WITH (value_format = 'json', kafka_topic_LIST='topic1, topic2, topic3');
CREATE STREAM mytopic_stream (myFixedFiedl1 varchar, eventPayload varchar) with (kafka_topic='my-final-topic-name', value_format='json')
as select myFixedField1, eventPayload from my_all_topics where myFixedField1 like 'myprefix%';
2条答案
按热度按时间bfhwhh0e1#
你不能完全按照你的意愿去做-一个ksql
STREAM
它来源于一个而且只有一个Kafka主题。但你可以用ksql的
INSERT INTO
实现你想要的功能。为源主题建模:
基于第一个源主题创建目标主题:
指定从其余源主题插入到目标主题:
另请参见
https://www.youtube.com/watch?v=z508vddtp_m
https://docs.confluent.io/current/ksql/docs/tutorials/basics-local.html#insert-进入
6qqygrtg2#
我不确定,但你似乎能把溪流和水结合起来
JOIN
.如果您还没有向ksql注册kafka主题,那么您必须首先处理好这一步。