我想按键连接几个(通常是2-10个)kafka主题,最好使用流式api。所有主题都将具有相同的键和分区。执行此联接的一种方法是创建 KStream
对于每个主题和链调用 KStream.outerJoin
:
stream1
.outerJoin(stream2, ...)
.outerJoin(stream3, ...)
.outerJoin(stream4, ...)
但是 KStream.outerJoin
建议每次呼叫 outerJoin
将具体化其两个输入流,因此上面的示例不仅将具体化流1到4,而且还将具体化 stream1.outerJoin(stream2, ...)
以及 stream1.outerJoin(stream2, ...).outerJoin(stream3, ...)
. 与直接连接4个流相比,会有很多不必要的序列化、反序列化和i/o。
上述方法的另一个问题是 JoinWindow
不会在所有4个输入流中保持一致:一个 JoinWindow
将用于连接流1和流2,但随后将使用一个单独的连接窗口来连接此流和流3等。例如,我为每个连接指定一个10秒的连接窗口,并且具有特定键的条目在0秒时出现在流1中,在6秒时出现在流2中,在12秒时出现在流3中,在18秒时出现在流4中,连接的项将在18秒后获得输出,导致延迟过高。结果取决于连接的顺序,这看起来很不自然。
有没有更好的方法来使用kafka实现多路连接?
2条答案
按热度按时间2cmtqfgy1#
我不知道Kafka流目前有什么更好的方法,但它正在形成:
https://cwiki.apache.org/confluence/display/kafka/kip-150+-+kafka-streams+cogroup
xiozqbni2#
最终,我决定创建一个定制的轻量级joiner,它避免了物化并严格遵守过期时间。平均应该是o(1)。与流api相比,它更适合于consumer api:对于每个consumer,重复轮询并使用任何接收到的数据更新joiner;如果joiner返回一个完整的属性集,那么就转发它。代码如下: