连接两条Kafka河

ttcibm8c  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(248)

试图连接两个kstream,但我得到类型不匹配错误下面是代码段。

KStream<String, String> longCounts = netExpence.join(netIncome, (key1, key2) -> key1 + "/" + key2,                    JoinWindows.of(joinWindowSizeMs).until(windowRetentionTimeMs),stringSerde, stringSerde, stringSerde);

出现的错误是类型不匹配:无法从string转换为r这是用于连接kstream的语法 join(KStream<K,VO> otherStream, ValueJoiner<? super V,? super VO,? extends VR> joiner, JoinWindows windows, Joined<K,V,VO> joined) 请具体解释一下 ValueJoiner<? super V,? super VO,? extends VR> 是的……谢谢

2skhul33

2skhul331#

这个 ValueJoiner 使用匹配记录的两个值调用,并发出联接结果值。

// key type must be the same for a join
// value type can be different
KStream<KeyType, ValueType1> stream1 = ...
KStream<KeyType, ValueType2> stream2 = ...

KStream<KeyType, OutputType> joined = stream1.join(stream2, ...);

因此, ValueJoiner 一定有 ValueType1 作为第一个泛型( ? super V ),和 ValueType2 作为第二通用( ? super VO ). 对于第三个通用( ? extend VR )指定输出类型(即, OutputType 从上面的例子)。
更新
您还需要为运行时配置正确的serde。如果所有类型都相同,最好通过 StreamsConfig 对应的键和/或值。否则,您可以覆盖每个运算符的默认序列号:
https://docs.confluent.io/current/streams/developer-guide/config-streams.html#default-密钥服务器
https://docs.confluent.io/current/streams/developer-guide/datatypes.html

相关问题