我有一个主要的Kafka主题,接收一些时间序列数据。我需要获取进入该主题的每个值,将其复制,并根据其键中的值将其发送到多个独立主题中的一个。因为它是时间序列数据,所以每个条目的值中都有一个时间戳。如何在确保推入单独主题中的值不会在时间戳方面出错的同时完成此拆分?
t9aqgxwy1#
您可以使用ksql并使用sql查询创建新主题:
CREATE STREAM pageviews( viewtime BIGINT KEY, userid VARCHAR, pageid VARCHAR ) WITH ( KAFKA_TOPIC='pageviews', VALUE_FORMAT='DELIMITED', PARTITIONS=4, REPLICAS=3 );
https://docs.ksqldb.io/en/latest/developer-guide/create-a-stream/或者叫@matthias j。用kstreams提到的sax:https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/kstream.html
az31mfrm2#
是的。使用kafkastreams,您可以连续读取代理中的任何主题,使用条件(基于id中的情况)处理数据,并在任何其他输出主题中写回代理。或者,要检查其中的结果,您可以从任何其他侦听器订阅这些输出主题。简单快捷。
2条答案
按热度按时间t9aqgxwy1#
您可以使用ksql并使用sql查询创建新主题:
https://docs.ksqldb.io/en/latest/developer-guide/create-a-stream/
或者叫@matthias j。用kstreams提到的sax:https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/kstream.html
az31mfrm2#
是的。使用kafkastreams,您可以连续读取代理中的任何主题,使用条件(基于id中的情况)处理数据,并在任何其他输出主题中写回代理。或者,要检查其中的结果,您可以从任何其他侦听器订阅这些输出主题。简单快捷。