我正在开发一个用kotlin编写的kafka streams应用程序,我看到了一些奇怪的连接行为。在高层次上,我用不同的键来流式处理两个主题。但是,我可以重新设置其中一条消息的键,以便它们的键排列在一起。但在我这样做之后,随后的连接不会被触发。下面我提供了简化的代码(删除不相关的部分并用注解替换)
val builder = KStreamBuilder()
val joinWindow = JoinWindows.of(/* 30 days */).until(/* 365 days */)
val topicOneStream = builder.stream<String, String>(topicOne)
val reKeyedTopicOneStream = topicOneStream.filter({ key, value ->
// ... some logic to filter messages here
}).selectKey({ _, value ->
// Rekey messages here based on the signature (which will match the key from corresponding messages on topicTwo)
JSON.parseMessage(value)?.data?.authorization?.data?.signature
}).peek({ key, value -> logger.info("post-topicOne-filter $key, $value")})
val topicTwoStream = builder.stream<String, String>(topicTwo)
val filteredTopicTwoStream = topicTwoStream.filter({ key, value ->
// ... other filtering logic goes here
// these keys match those of the re-keyed messages from topic one
}).peek({ key, value -> logger.info("post-hello-sign event filter: $key, $value")})
val joinedStream = reKeyedTopicOneStream.join(filteredTopicTwoStream, { _, _ ->
// Doesn't fire... sometimes
}, joinWindow)
当我们运行这个程序时,我们会看到来自两个peek的控制台输出,这表明消息已经按照预期进行了过滤和重新设置了密钥。
我们还尝试在连接之前将两个流的输出发送到两个分区相似的主题,这样我们就可以对主题进行分类,并查看消息正在写入哪个分区以及使用哪个键。结果看起来不错,如下所示:
暂无答案!
目前还没有任何答案,快来回答吧!