如何在Apache Flink中基于第二个键拆分窗口?

pu82cl6c  于 2023-01-19  发布在  Apache
关注(0)|答案(1)|浏览(149)

我正在尝试创建产品扫描仪的数据流处理,该扫描仪以以下Tuple 4的形式生成事件:时间戳(长,以毫秒为单位)、客户端ID(int)、产品ID(int)、数量(int)。
最后,应当获得元组3的流:ClientID(int)、ProductID(int)、Quantity(int),表示具有给定ClientID的客户购买的具有相同ProductID的所有产品的分组。对于任何“交易”,产品扫描之间的最大间隔可以是10秒。
这是一小段代码,展示了我最初的尝试:

DataStream<Tuple4<Long, Integer, Integer, Integer>> inStream = ...;

        WindowedStream<Tuple4<Long, Integer, Integer, Integer>, Integer, TimeWindow> windowedStream = inStream
            .keyBy((tuple) -> Tuple2.of(tuple.f1, tuple.f2))
            .window(EventTimeSessionWindows.withGap(Time.seconds(10)));
        
        windowedStream.aggregate(...); // Drop timestamp, sum quantity, keep the rest the same

但是,这就是问题所在。通常,SessionWindow就足够了,但在本例中,它在具有密钥(ClientID、ProductID)的2个事件之间实现了10秒的间隔,这不是预期的。
如果我们想象下面的元组进来:
1.(10_000,1,1,1)〈6秒间隔〉
1.(16_000,1,2,1)〈6秒间隔〉
1.(22_000,1,1,1)〈6秒间隔〉
1.(28 - 000,一,二,一)
元组序列应在同一SessionWindow中,并且1和2应分别与3和4合并,从而生成两个输出事件。但是,它们不在同一SessionWindow中,因为1+3和2+4被keyBy拆分为各自的流,并且由于它们不满足产品之间最长10秒的要求,因此未聚集它们。
我想知道是否有一种方法可以通过应用“第二个”密钥来解决这个问题。首先,应该基于密钥ClientID来拆分流,然后应该应用SessionWindow(与产品无关)。随后,我想知道是否有一种方法可以使用第二个键细分ClientID键控的SessionWindows(将是ProductID)并有效地到达与之前相同的键(ClientID,ProductID),而没有之前的问题。然后,可以正常地应用聚合以到达预期的输出流。
如果这不可能,有没有其他解决办法?

holgip5t

holgip5t1#

解决这个问题最简单的方法是基于ClientID进行分区,以捕获特定客户端完成的所有扫描,然后使用process,这将给予您能够访问特定窗口中的所有元素,在该窗口中,您可以为每个ProductID生成单独的事件或输出。是否有任何原因导致这在您的设置中不起作用?

相关问题