java—在ApacheFlink测试中是否有虚拟时间的概念,就像在reactor和rxjava中一样

6rvt4ljy  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(335)

在rxjava和reactor中,有虚拟时间的概念来测试依赖于时间的操作符。我不知道在Flink怎么做。例如,我把下面的例子放在一起,在这里我想玩一下迟到的事件,以了解它们是如何处理的。然而,我无法理解这样的测试会是什么样子?有没有办法把Flink和React堆结合起来,使试验更好?

public class PlayWithFlink {

    public static void main(String[] args) throws Exception {

        final OutputTag<MyEvent> lateOutputTag = new OutputTag<MyEvent>("late-data"){};

        // TODO understand how BoundedOutOfOrderness is related to allowedLateness
        BoundedOutOfOrdernessTimestampExtractor<MyEvent> eventTimeFunction = new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {
            @Override
            public long extractTimestamp(MyEvent element) {
                return element.getEventTime();
            }
        };

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<MyEvent> events = env.fromCollection(MyEvent.examples())
                .assignTimestampsAndWatermarks(eventTimeFunction);

        AggregateFunction<MyEvent, MyAggregate, MyAggregate> aggregateFn = new AggregateFunction<MyEvent, MyAggregate, MyAggregate>() {
            @Override
            public MyAggregate createAccumulator() {
                return new MyAggregate();
            }

            @Override
            public MyAggregate add(MyEvent myEvent, MyAggregate myAggregate) {
                if (myEvent.getTracingId().equals("trace1")) {
                    myAggregate.getTrace1().add(myEvent);
                    return myAggregate;
                }
                myAggregate.getTrace2().add(myEvent);
                return myAggregate;
            }

            @Override
            public MyAggregate getResult(MyAggregate myAggregate) {
                return myAggregate;
            }

            @Override
            public MyAggregate merge(MyAggregate myAggregate, MyAggregate acc1) {
                acc1.getTrace1().addAll(myAggregate.getTrace1());
                acc1.getTrace2().addAll(myAggregate.getTrace2());
                return acc1;
            }
        };

        KeySelector<MyEvent, String> keyFn = new KeySelector<MyEvent, String>() {
            @Override
            public String getKey(MyEvent myEvent) throws Exception {
                return myEvent.getTracingId();
            }
        };

        SingleOutputStreamOperator<MyAggregate> result = events
                .keyBy(keyFn)
                .window(EventTimeSessionWindows.withGap(Time.seconds(10)))
                .allowedLateness(Time.seconds(20))
                .sideOutputLateData(lateOutputTag)
                .aggregate(aggregateFn);

        DataStream lateStream = result.getSideOutput(lateOutputTag);

        result.print("SessionData");

        lateStream.print("LateData");

        env.execute();
    }
}

class MyEvent {
    private final String tracingId;
    private final Integer count;
    private final long eventTime;

    public MyEvent(String tracingId, Integer count, long eventTime) {
        this.tracingId = tracingId;
        this.count = count;
        this.eventTime = eventTime;
    }

    public String getTracingId() {
        return tracingId;
    }

    public Integer getCount() {
        return count;
    }

    public long getEventTime() {
        return eventTime;
    }

    public static List<MyEvent> examples() {
        long now = System.currentTimeMillis();
        MyEvent e1 = new MyEvent("trace1", 1, now);
        MyEvent e2 = new MyEvent("trace2", 1, now);
        MyEvent e3 = new MyEvent("trace2", 1, now - 1000);
        MyEvent e4 = new MyEvent("trace1", 1, now - 200);
        MyEvent e5 = new MyEvent("trace1", 1, now - 50000);
        return Arrays.asList(e1,e2,e3,e4, e5);
    }

    @Override
    public String toString() {
        return "MyEvent{" +
                "tracingId='" + tracingId + '\'' +
                ", count=" + count +
                ", eventTime=" + eventTime +
                '}';
    }
}

class MyAggregate {
    private final List<MyEvent> trace1 = new ArrayList<>();
    private final List<MyEvent> trace2 = new ArrayList<>();

    public List<MyEvent> getTrace1() {
        return trace1;
    }

    public List<MyEvent> getTrace2() {
        return trace2;
    }

    @Override
    public String toString() {
        return "MyAggregate{" +
                "trace1=" + trace1 +
                ", trace2=" + trace2 +
                '}';
    }
}

运行此命令的输出是:

SessionData:1> MyAggregate{trace1=[], trace2=[MyEvent{tracingId='trace2', count=1, eventTime=1551034666081}, MyEvent{tracingId='trace2', count=1, eventTime=1551034665081}]}
SessionData:3> MyAggregate{trace1=[MyEvent{tracingId='trace1', count=1, eventTime=1551034166081}], trace2=[]}
SessionData:3> MyAggregate{trace1=[MyEvent{tracingId='trace1', count=1, eventTime=1551034666081}, MyEvent{tracingId='trace1', count=1, eventTime=1551034665881}], trace2=[]}

不过,我希望看到 e5 在第一个事件触发前50秒发生的事件。

yx2lnoni

yx2lnoni1#

如果您将水印赋值器修改为这样

AssignerWithPunctuatedWatermarks eventTimeFunction = new AssignerWithPunctuatedWatermarks<MyEvent>() {
    long maxTs = 0;

    @Override
    public long extractTimestamp(MyEvent myEvent, long l) {
        long ts = myEvent.getEventTime();
        if (ts > maxTs) {
            maxTs = ts;
        }
        return ts;
    }

    @Override
    public Watermark checkAndGetNextWatermark(MyEvent event, long extractedTimestamp) {
        return new Watermark(maxTs - 10000);
    }
};

然后你就会得到你期望的结果。我不推荐这个——只是用它来说明发生了什么。
这里发生的事情是 BoundedOutOfOrdernessTimestampExtractor 是一个周期性水印生成器,它每200毫秒(默认情况下)只向流中插入一个水印。因为您的作业在此之前完成的时间很长,所以您的作业正在经历的唯一水印是flink在每个有限流的末尾注入的水印(具有值max\u水印)。延迟与水印有关,您希望延迟的事件是设法在水印之前到达的。
通过切换到标点水印,可以强制水印更频繁地出现,或者更精确地出现在流中的特定点。这通常是不必要的(并且过于频繁的水印会导致开销),但是当您想要对水印的顺序进行强有力的控制时,这是很有帮助的。
至于如何编写测试,您可以看看flink自己的测试中使用的测试工具,或者flinkspector。
更新:
与BoundedAutoFordernessTimestampExtractor相关联的时间间隔是一个规范,它说明了流的无序程度。到达该界限内的事件不会被认为是延迟的,并且事件时间计时器在该延迟过去之前不会触发,从而为无序事件的到达提供时间。allowedlateness只适用于windowapi,它描述了框架在超过正常窗口触发时间后保持窗口状态的时间,以便事件仍然可以添加到窗口并导致延迟触发。在这个额外的间隔之后,窗口状态被清除,随后的事件被发送到侧输出(如果配置的话)。

所以当你使用 BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) 你并不是说“在每次事件发生后等待10秒,以防之前的事件仍然会发生”。但你的意思是,你的事件最多应该有10秒的时间不正常。因此,如果您正在处理实时事件流,这意味着您最多要等待10秒,以防更早的事件到达(如果您正在处理历史数据,那么您可能能够在1秒内处理10秒的数据,或者不能——知道您将等待n秒的事件时间过去并不能说明实际需要多长时间。)
有关此主题的详细信息,请参见事件时间和水印。

相关问题