在使用时,如何找到与其他事件不匹配的事件流 CoGroupFunction
?
让我们考虑一下人们是通过电话交流的。在 Tuple2<String, Integer>
, f0
是人名和 f1
是他们呼叫或接听电话的电话号码。我们使用 coGroup
然而,我们却在想念那些接到外界电话的人。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Tuple2<String, Integer>> callers = env.fromElements(
new Tuple2<String, Integer>("alice->", 12), // alice dials 12
new Tuple2<String, Integer>("bob->", 13), // bob dials 13
new Tuple2<String, Integer>("charlie->", 19))
.assignTimestampsAndWatermarks(new TimestampExtractor(Time.seconds(5)));
DataStream<Tuple2<String, Integer>> callees = env.fromElements(
new Tuple2<String, Integer>("->carl", 12), // carl received call
new Tuple2<String, Integer>("->ted", 13),
new Tuple2<String, Integer>("->chris", 7))
.assignTimestampsAndWatermarks(new TimestampExtractor(Time.seconds(5)));;
DataStream<Tuple1<String>> groupedStream = callers.coGroup(callees)
.where(evt -> evt.f1).equalTo(evt -> evt.f1)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply(new IntEqualCoGroupFunc());
groupedStream.print(); // prints 1> (alice->-->carl) \n 1> (bob->-->ted)
//DataStream<Tuple1<String>> notGroupedStream = ..; // people without pairs in last window
//notGroupedStream.print(); // should print charlie->-->someone \n someone->-->chris
env.execute();
1条答案
按热度按时间nc1teljy1#
老实说,最简单的解决办法似乎是改变
IntEqualCoGroupFunc
,因此String
它回来了(Boolean, String)
. 这是因为coGroup
也处理那些没有匹配键的元素,这些元素将有一个匹配键Iterable
函数中为空coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out)
i、 对于你的情况,它会收到("->chris", 7)
作为first
而且是空的Iterable
作为second
.签名的更改允许您轻松地发出没有匹配密钥的结果,并在后期处理时将它们简单地拆分为单独的流。
输出如下: