Kafka流到ktable连接

62o28rlo  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(369)

我想加入一个
kstream:从一个主题创建,该主题具有json值。我使用值中的两个属性重新设置流的键。示例值(json的片段)。我创建了一个定制的pojo类并使用了一个定制的serdes。 {"value":"0","time":1.540753118800291E9,,"deviceIp":"111.111.111.111","deviceName":"KYZ1","indicatorName":"ifHCInOctets"} 键Map为: map((key, value) -> KeyValue.pair(value.deviceName+value.indicatorName, value)) 我在kstream上看了一眼,然后打印出我使用的键和属性。看起来都不错。
ktable:我从一个主题创建一个ktable,我正在使用python脚本编写主题,主题的关键是 KYZ1ifHCInOctets ,设备名称和指示符名称的组合(从上面)。我做了一个tostream,然后看了一眼结果流。键和值看起来都很好。
现在,当我做一个内部连接,并做一个窥视或通过/到一个主题时,我看到键和值是不匹配的。加入似乎不起作用,

KStream<String, MyPojoClass> joined= datastream.join(table, 
          (data,table)->data
          ,Joined.with(Serdes.String(),myCustomSerde,Serdes.String())
          );

key = XYZ1s1_TotalDiscards
Value = {"deviceName":"ABC2", "indicatorName":"jnxCosQstatTxedBytes"}

我有完全相同的东西通过ksql工作,但想做我自己的流应用程序。

fjaof16o

fjaof16o1#

这个错误听起来太愚蠢了,我的pojo类几乎没有static:()属性,这导致了错误的键。

相关问题