apache-flink异步请求和windows

i7uaboj4  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(285)

我无法选择合适的windows函数/赋值器。任务如下。首先,我从具有请求id和一些数据的源中获取数据,并对外部数据库执行异步请求。

// Here String is for request_id, Data is for treated data
DataStream Tuple2<String, Data> stream = ...

// async I/O queries
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.unorderedWait(
    stream,
    new AsyncDatabaseRequest(),
    1000,
    TimeUnit.MILLISECONDS,
    100
);

现在我想按请求id收集所有数据并进行一些计算。

DataStream Tuple2<String, Integer> = result
    .map(val -> new Tuple2<String, Integer>(val.f0, val.f1.data_int))
    .keyBy(0)
    .window(...)
    .sum(1);

问题是窗口函数。我需要每个窗口包含具有相同请求id的所有数据点,但是异步查询的时间可能从毫秒到分钟不等。另一方面,我需要低延迟,所以我不能使用 ProcessingTimeSessionWindows.withGap(Time.minutes(10)) . 我需要在从异步函数获得最后一个数据后立即执行计算。
对我来说最好的方法是使用async函数中的窗口水印,它当然知道每个查询何时完成以及它所承受的马赫数。这是可能的吗?对于这样的任务,最佳做法是什么?

but5z9lq

but5z9lq1#

嗯,我找到了解决办法,看起来很简单。我只是利用事件时间。在我的源函数中,我生成事件时间戳和水印,如下所示:

Long ts = System.currentTimeMillis();
ctx.collectWithTimestamp(data, ts);
ctx.emitWatermark(new Watermark(ts + 1));

在流中,我使用eventtime函数:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<...> dataStream = ...;

DataStream<...> newStream = dataStream
    .keyBy(0)
    .timeWindow(Time.milliseconds(1))
    .reduce(new Reducer());

这样我就避免了超时,结果马上就准备好了。

相关问题