我想加入一个
kstream:从一个主题创建,该主题具有json值。我使用值中的两个属性重新设置流的键。示例值(json的片段)。我创建了一个定制的pojo类并使用了一个定制的serdes。 {"value":"0","time":1.540753118800291E9,,"deviceIp":"111.111.111.111","deviceName":"KYZ1","indicatorName":"ifHCInOctets"}
键Map为: map((key, value) -> KeyValue.pair(value.deviceName+value.indicatorName, value))
我在kstream上看了一眼,然后打印出我使用的键和属性。看起来都不错。
ktable:我从一个主题创建一个ktable,我正在使用python脚本编写主题,主题的关键是 KYZ1ifHCInOctets
,设备名称和指示符名称的组合(从上面)。我做了一个tostream,然后看了一眼结果流。键和值看起来都很好。
现在,当我做一个内部连接,并做一个窥视或通过/到一个主题时,我看到键和值是不匹配的。加入似乎不起作用,
KStream<String, MyPojoClass> joined= datastream.join(table,
(data,table)->data
,Joined.with(Serdes.String(),myCustomSerde,Serdes.String())
);
key = XYZ1s1_TotalDiscards
Value = {"deviceName":"ABC2", "indicatorName":"jnxCosQstatTxedBytes"}
我有完全相同的东西通过ksql工作,但想做我自己的流应用程序。
1条答案
按热度按时间fjaof16o1#
这个错误听起来太愚蠢了,我的pojo类几乎没有static:()属性,这导致了错误的键。