java Kafka Streams -用于自定义对象的Serdes

bogh5gae  于 2022-12-25  发布在  Java
关注(0)|答案(1)|浏览(219)

我是Kafka Streams的新手,发现总体来说API很混乱,文档也不容易理解。我正在编写一个简单的流应用程序,如下所示:
1.输入流:key(String)-〉userID,value(String)-〉用户的事务记录JSON字符串。
1.拓扑:聚合上述输入并生成一个KTable〈String,UserAccountBalance〉,Key为userId,value为一个自定义对象,随着聚合的进行而更新。

final KStream<String, String> transactionsInput = streamsBuilder.stream("bank-balance-input-topic");
 final KTable<String, UserBankBalance> table =
         transactionsInput.groupBy((key, value) -> key)
                          .aggregate(() -> new UserBankBalance("dummyUserId", 0, "1866-12-23T17:47:37Z"),
                                     (userName, transactionRecordStr, userBankBalance) -> {
                                         // Code does the following:
                                         //   1. Deserialize the transactionRecordStr
                                         //   2. Update the UserBankBalance object.
                                         //   return userBankBalance;
                                     });

(默认键、值serdes配置为String)然而,在运行一些健全性测试时,我发现String序列化器与UserBankBalance对象不兼容。

  • 为什么mapValues、groupByKey、aggregate等操作需要Serdes?

我的理解是:

  • 流库需要使用默认的serdes来具体化对象以更新内部状态?
  • 如果发生了重新分区,键、值需要序列化并存储回内部分区以供进一步处理?
  • 鉴于上述情况,即使我们只是将KTable<String,UserBankBalance>创建为内存中的表示,仍然需要Serdes。

我已经阅读了官方文件和API文件,只是找不到任何好的澄清。

  • 为什么Kafka Stream的库不提供一个默认的ObjectMapperSerdes,它利用了Jackson的ObjectMapper(就像这个官方例子)?我想很多用户会有类似的用例,库用户这样做会有重复的努力。

参考文献:

jvlzgdj9

jvlzgdj91#

为什么操作需要Serdes
Kafka存储字节。Streams API不将对象从一个操作传递到另一个操作,它使用Kafka作为消息总线。它需要将任何对象序列化为字节以通过网络发送。
如果您正在使用JSON,那么Kafka已经有一种内置的方法来创建JSONSerde;因为可以使用Serdes.serdeFrom静态方法,所以不需要ObjectMapper类路径(同样,它会创建对connect-json模块的依赖关系,并扩大kafka-streams类路径)。
或者,Spring-Kafka也有JsonSerde,Confluent维护AvroSerdeProtobufSerde等,以便与这些工具生成的类一起使用。
获取的字符串序列化程序与UserBankBalance对象不兼容
您需要对每个操作使用Grouped , Materialized , Consumed , or Produced类的某种组合来覆盖默认的拓扑serde。

相关问题