我有一个场景,在它到达kafka之前,我需要更改从jdbc源连接器获得的消息结构。我是一个初学者Kafka,所以Kafka流可以帮助我实现这一点。
输入-
{"id":"123","firstName":"pqr","lastName":"xyz","age":"23","dob":"20-09-1995"}
我想把它存储在elasticseacrh索引中
{"id":"123","name":{"firstName":"pqr","lastName":"xyz"},"age":"23","dob":"20-09-1995"}
我不是Kafka流设置,我想使用它,如果使用ksql是可能的,我想知道。
1条答案
按热度按时间b91juud31#
你好像在用Kafka连接。在这种情况下,“单一消息转换”(smt)是您的朋友。这些允许您在连接器将数据写入主题之前逐个转换每条消息。
查看此博客文章了解更多详细信息:https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/