kafka用例:连续读取kafka,对消息进行解密,然后插入db

pwuypxnk  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(393)

kafka用例:连续读取kafka(可能是一个流),用java方法对消息值进行解密,然后插入db(sink connector或者我有一个java restapi,如果需要可以插入db)
我还没有看到这个用例的任何可行的解决方案,因为:
我无法在Kafka中存储解密的数据。
我需要在插入数据库之前执行解密步骤。
查看kafka connect和kafka streams以解决用例:
似乎我不能使用Kafka连接,因为我不知道如何包含解密步骤。
好像我不能用Kafka流,因为1。它是为阅读和写回主题而构建的。2即使我使用处理器api来实现自定义逻辑,我也不知道如何转换 KStream 消息值到 String 我可以传递给我的java解密方法或数据库。
我有一个在java应用程序中使用常规kafka使用者的解决方案,但它是作为一个一次性的批处理作业来完成的,我需要一个长期存在并不断检查kafka队列的应用程序。我可以无限循环处理批处理作业,但我不确定这是一个可行的选择。
如何完成这个用例?Spark不是我们的选择。

wqlqzqxt

wqlqzqxt1#

您应该使用kafka connect并通过实现 Transformation 接口:
https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/transforms/transformation.java
查看此演讲了解更多详细信息:https://www.confluent.io/thank-you/single-message-transformations-not-transformations-youre-looking/

相关问题