Kafka加入后不开火重键

jhkqcmku  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(203)

我正在开发一个用kotlin编写的kafka streams应用程序,我看到了一些奇怪的连接行为。在高层次上,我用不同的键来流式处理两个主题。但是,我可以重新设置其中一条消息的键,以便它们的键排列在一起。但在我这样做之后,随后的连接不会被触发。下面我提供了简化的代码(删除不相关的部分并用注解替换)

val builder = KStreamBuilder()
val joinWindow = JoinWindows.of(/* 30 days */).until(/* 365 days */)

val topicOneStream = builder.stream<String, String>(topicOne)
val reKeyedTopicOneStream = topicOneStream.filter({ key, value ->
        // ... some logic to filter messages here
}).selectKey({ _, value ->
        // Rekey messages here based on the signature (which will match the key from corresponding messages on topicTwo)
        JSON.parseMessage(value)?.data?.authorization?.data?.signature
}).peek({ key, value -> logger.info("post-topicOne-filter $key, $value")})

val topicTwoStream = builder.stream<String, String>(topicTwo)
val filteredTopicTwoStream = topicTwoStream.filter({ key, value ->
        // ... other filtering logic goes here
        // these keys match those of the re-keyed messages from topic one
}).peek({ key, value -> logger.info("post-hello-sign event filter: $key, $value")})

val joinedStream = reKeyedTopicOneStream.join(filteredTopicTwoStream, { _, _  ->
    // Doesn't fire... sometimes
}, joinWindow)

当我们运行这个程序时,我们会看到来自两个peek的控制台输出,这表明消息已经按照预期进行了过滤和重新设置了密钥。
我们还尝试在连接之前将两个流的输出发送到两个分区相似的主题,这样我们就可以对主题进行分类,并查看消息正在写入哪个分区以及使用哪个键。结果看起来不错,如下所示:

主题一: Message with key 7e7f4e74-fc5e-4676-893a-353e4fb217f6 at partition 1 at offset 0 ####主题二: Message with key 7e7f4e74-fc5e-4676-893a-353e4fb217f6 at partition 1 at offset 0 有什么我不知道的吗?我的理解是,具有相同密钥的消息应该会导致join触发,但这不是我看到的。谢谢你的帮助!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题