java—使用kafka流进行自定义转换

s4chpxco  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(381)

我一直在实施 ETL 使用apachekafka的数据管道。我用Kafka连接提取和加载。
connect将读取源数据并以json的形式编写kafka主题的实际数据。
在转换阶段,我想从一个kafka主题中读取json数据,然后需要转换为基于一些自定义业务逻辑的sql查询,然后需要编写输出kafka主题。
到目前为止,我已经编写了一个生产者-消费者应用程序,它从topic读取数据并进行转换,然后写入输出topic。
有没有可能使用kafka流api实现同样的功能?如果是,请提供一些样品。

9rbhqvlz

9rbhqvlz1#

看看Kafka流,或ksql。ksql运行在kafka流之上,它提供了一种非常简单的方法来构建您所讨论的聚合。
下面是一个在ksql中聚合数据流的示例

SELECT PAGE_ID,COUNT(*) FROM PAGE_CLICKS WINDOW TUMBLING (SIZE 1 HOUR) GROUP BY PAGE_ID

更多信息请访问:https://www.confluent.io/blog/using-ksql-to-analyse-query-and-transform-data-in-kafka
您可以获取ksql的输出,它实际上只是一个kafka主题,并通过kafka连接流式传输,例如到elasticsearch、cassandra等等。

相关问题