flink-event时间滑动窗口,由于时间间隔,窗口中缺少数据

xesrikrc  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(400)

假设我有一系列股票市场交易事件,比如:

technical1, ALXN, 1/1/2016
technical1, CELG, 1/1/2016
technical2, ALXN, 1/2/2016
technical2, CELG, 1/2/2016
. . . 
technicalN, ALXN, 4/1/2018
technicalN, CELG, 4/1/2018

这样,technicaln(其中n是某个数字)表示给定公司的日终股票市场交易数据的第n个技术交易条目[开盘(float)、高盘(float)、低盘(float)、收盘(float)、成交量(int)](i、 例如,ticker goog的technical1不同于ticker msft的technical1),例如:

12.52, 19.25, 09.11, 17.54, 120532, GOOG, 1/1/2017
14.37, 29.52, 01.53, 12.96, 627156, MSFT, 1/1/2017

(请注意,这些交易价格/交易量完全是虚构的。)
假设我想创建一个间隔为1天的大小为2的窗口,这样我们的数据看起来像这样:

[technical1, GOOG, 12/26/2017; technical2, GOOG, 12/27/2017]
[technical1, MSFT, 12/26/2017; technical2, MSFT, 12/27/2017]
[technical2, GOOG, 12/27/2017; technical3, GOOG, 12/28/2017]
[technical2, MSFT, 12/27/2017; technical3, MSFT, 12/28/2017]
[technical3, GOOG, 12/28/2017; technical4, GOOG, 12/29/2017]
[technical3, MSFT, 12/28/2017; technical4, MSFT, 12/29/2017]
[technical4, GOOG, 12/29/2017; technical5, GOOG, 12/30/2017]
[technical4, MSFT, 12/29/2017; technical5, MSFT, 12/30/2017]
[technical5, GOOG, 12/30/2017; technical6, GOOG, 12/31/2017]
[technical5, MSFT, 12/30/2017; technical6, MSFT, 12/31/2017]
[technical6, GOOG, 12/31/2017; technical7, GOOG, 01/01/2018]
[technical6, MSFT, 12/31/2017; technical7, MSFT, 01/01/2018]
[technical7, GOOG, 01/01/2018; technical8, GOOG, 01/02/2018]
[technical7, MSFT, 01/01/2018; technical8, MSFT, 01/02/2018]
[technical8, GOOG, 01/02/2018; technical9, GOOG, 01/03/2018]
[technical8, MSFT, 01/02/2018; technical9, MSFT, 01/03/2018]
[. . .]
[technicalN, GOOG, 04/01/2018; technicalN+1, GOOG, 04/02/2018]
[technicalN, MSFT, 04/01/2018; technicalN+1, MSFT, 04/02/2018]
. . .

这将是很好的,但这是有问题的,因为股市交易日期是不连续的。。。换句话说,如果我正确理解了flink的机制(我可能错了),那么使用事件时间滑动窗口的问题是:

DataStream<T> input = ...;

// sliding event-time windows
input
.keyBy((TechnicalDataEntry technical) -> technical.ticker)
.window(SlidingEventTimeWindows.of(Time.day(2), Time.day(1))) // Window size of 2 days, sliding interval of 1 day
.<windowed transformation>(<window function>);

在这样的数据上,日期值是不连续的(这意味着它们遵循一个离散序列,其中包含一个或多个缺失日的不连续性),因为没有股票市场关闭日期的股票市场数据,例如在节假日或周末。因此,考虑到这一点,我们的数据流最终看起来更像这样(因为交易在2017年12月30日、2017年12月31日和2018年1月1日结束):

[technical1, GOOG, 12/26/2017; technical2, GOOG, 12/27/2017]
[technical1, MSFT, 12/26/2017; technical2, MSFT, 12/27/2017]
[technical2, GOOG, 12/27/2017; technical3, GOOG, 12/28/2017]
[technical2, MSFT, 12/27/2017; technical3, MSFT, 12/28/2017]
[technical3, GOOG, 12/28/2017; technical4, GOOG, 12/29/2017]
[technical3, MSFT, 12/28/2017; technical4, MSFT, 12/29/2017]
[technical4, GOOG, 12/29/2017; NULL]
[technical4, MSFT, 12/29/2017; NULL]
[NULL; NULL]
[NULL; NULL]
[NULL; NULL]
[NULL; NULL]
[NULL; technical8, GOOG, 01/02/2018]
[NULL; technical8, MSFT, 01/02/2018]
[technical8, GOOG, 01/02/2018; technical9, GOOG, 01/03/2018]
[technical8, MSFT, 01/02/2018; technical9, MSFT, 01/03/2018]
[. . .]
[technicalN, GOOG, 04/01/2018; technicalN+1, GOOG, 04/02/2018]
[technicalN, MSFT, 04/01/2018; technicalN+1, MSFT, 04/02/2018]

如何让我的flink流忽略丢失的日期(而不是窗口、连接或Map连续的非丢失日期),使我的流看起来像这样:

[technical1, GOOG, 12/26/2017; technical2, GOOG, 12/27/2017]
[technical1, MSFT, 12/26/2017; technical2, MSFT, 12/27/2017]
[technical2, GOOG, 12/27/2017; technical3, GOOG, 12/28/2017]
[technical2, MSFT, 12/27/2017; technical3, MSFT, 12/28/2017]
[technical3, GOOG, 12/28/2017; technical4, GOOG, 12/29/2017]
[technical3, MSFT, 12/28/2017; technical4, MSFT, 12/29/2017]
[technical4, GOOG, 12/29/2017; technical5, GOOG, 01/02/2018]
[technical4, MSFT, 12/29/2017; technical5, MSFT, 01/02/2018]
[technical5, GOOG, 01/02/2018; technical6, GOOG, 01/03/2018]
[technical5, MSFT, 01/02/2018; technical6, MSFT, 01/03/2018]
[. . .]
[technicalN, GOOG, 04/01/2018; technicalN+1, GOOG, 04/02/2018]
[technicalN, MSFT, 04/01/2018; technicalN+1, MSFT, 04/02/2018]

?
(注意:请忽略我用字符串“technical”(如technical1、technical2等)递增数字的方式,因为正如我前面提到的,这个值在本文中只是用于描述性目的,实际上并不存在于数据中。确定两个交易条目是否连续的唯一方法是按股票代码对它们进行分组并按交易日期排序。假设不存在重复的事件。)

ldfqzlk8

ldfqzlk81#

如果我理解正确,你的问题是,因为有一定的时期,当你没有收到事件,那么窗口将不会正常运行,因为他们不知道时间的推移。
您可以选择沿周向发射水印,如下所示:

streamEnvironment.addSource(new SourceFunction<Object>() {
        @Override
        public void run(final SourceContext<Object> ctx) {
            (...)

            ctx.emitWatermark(new Watermark(timestamp));
        }

        @Override
        public void cancel() {

        }
    })

请记住,如果您在水印之前接收到事件,它们将被忽略,因此水印发射的周期性是“窗口准确性”(尽可能快地触发)和容忍后期事件之间的权衡。

相关问题