查找未使用cogroupfunction分组的事件流

a14dhokn  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(313)

在使用时,如何找到与其他事件不匹配的事件流 CoGroupFunction ?
让我们考虑一下人们是通过电话交流的。在 Tuple2<String, Integer> , f0 是人名和 f1 是他们呼叫或接听电话的电话号码。我们使用 coGroup 然而,我们却在想念那些接到外界电话的人。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Tuple2<String, Integer>> callers = env.fromElements(
        new Tuple2<String, Integer>("alice->", 12), // alice dials 12
        new Tuple2<String, Integer>("bob->", 13),   // bob dials 13
        new Tuple2<String, Integer>("charlie->", 19))
        .assignTimestampsAndWatermarks(new TimestampExtractor(Time.seconds(5)));

DataStream<Tuple2<String, Integer>> callees = env.fromElements(
        new Tuple2<String, Integer>("->carl", 12), // carl received call
        new Tuple2<String, Integer>("->ted", 13),
        new Tuple2<String, Integer>("->chris", 7))
        .assignTimestampsAndWatermarks(new TimestampExtractor(Time.seconds(5)));;

DataStream<Tuple1<String>> groupedStream = callers.coGroup(callees)
        .where(evt -> evt.f1).equalTo(evt -> evt.f1)
        .window(TumblingEventTimeWindows.of(Time.seconds(10)))
        .apply(new IntEqualCoGroupFunc());

groupedStream.print(); // prints 1> (alice->-->carl) \n 1> (bob->-->ted)

//DataStream<Tuple1<String>> notGroupedStream = ..; // people without pairs in last window
//notGroupedStream.print(); // should print charlie->-->someone \n someone->-->chris

env.execute();
nc1teljy

nc1teljy1#

老实说,最简单的解决办法似乎是改变 IntEqualCoGroupFunc ,因此 String 它回来了 (Boolean, String) . 这是因为 coGroup 也处理那些没有匹配键的元素,这些元素将有一个匹配键 Iterable 函数中为空 coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) i、 对于你的情况,它会收到 ("->chris", 7) 作为 first 而且是空的 Iterable 作为 second .
签名的更改允许您轻松地发出没有匹配密钥的结果,并在后期处理时将它们简单地拆分为单独的流。

// Implementation of IntEqualCoGroupFunc
@Override
public void coGroup(Iterable<Tuple2<String, Integer>> outbound, Iterable<Tuple2<String, Integer>> inbound,
        Collector<Tuple1<String>> out) throws Exception {

    for (Tuple2<String, Integer> outboundObj : outbound) {
        for (Tuple2<String, Integer> inboundObj : inbound) {
            out.collect(Tuple1.of(outboundObj.f0 + "-" + inboundObj.f0)); //matching pair
            return;
        }
        out.collect(Tuple1.of(outboundObj.f0 + "->someone")); //inbound is empty
        return;
    }

    // outbound is empty
    for (Tuple2<String, Integer> inboundObj : inbound) {
        out.collect(Tuple1.of("someone->-" + inboundObj.f0));
        return;
    }
    //inbound also empty
    out.collect(Tuple1.of("someone->-->someone"));
}

输出如下:

2> (someone->-->chris)
2> (charlie->->someone)
1> (alice->-->carl)
1> (bob->-->ted)

相关问题