最近我尝试使用apacheflink进行快速批处理。我有一张table和一张tablecolumn:value and 不相关的索引列
基本上我想计算每5行值的平均值和范围。然后我将根据我刚刚计算的平均值来计算平均值和标准差。所以我想最好的办法就是 Tumble
Windows。
看起来像这样
DataSet<Tuple2<Double, Integer>> rawData = {get the source data};
Table table = tableEnvironment.fromDataSet(rawData);
Table groupedTable = table
.window(Tumble.over("5.rows").on({what should I write?}).as("w")
.groupBy("w")
.select("f0.avg, f0.max-f0.min");
{The next step is to use groupedTable to calculate overall mean and stdDev}
但我不知道该写什么 .on()
. 我试过了 "proctime"
但它表示,目前还没有这样的投入。我只希望它在从源代码读取时按顺序分组。但它必须是一个时间属性,所以我不能使用 "f2"
-索引列也按顺序排列。
我必须添加时间戳才能这样做吗?在批处理中有必要吗?它会减慢计算速度吗?解决这个问题的最好办法是什么?
更新:我试着在表api中使用滑动窗口,结果得到了一个异常。
// Calculate mean value in each group
Table groupedTable = table
.groupBy("f0")
.select("f0.cast(LONG) as groupNum, f1.avg as avg")
.orderBy("groupNum");
//Calculate moving range of group Mean using sliding window
Table movingRangeTable = groupedTable
.window(Slide.over("2.rows").every("1.rows").on("groupNum").as("w"))
.groupBy("w")
.select("groupNum.max as groupNumB, (avg.max - avg.min) as MR");
例外情况是:
线程“main”java.lang.unsupportedoperationexception中出现异常:当前不支持事件时间上的计数滑动组窗口。
在org.apache.flink.table.plan.nodes.dataset.datasetwindowaggegate.createeventtimeslidingwindowdataset(datasetwindowaggegate。scala:456)
在org.apache.flink.table.plan.nodes.datasetwindowaggregate.translatetoplan(datasetwindowaggregate。scala:139)
...
这是否意味着表api中不支持滑动窗口?如果我没记错的话,dataset api中没有窗口函数。那么如何计算间歇过程中的移动范围呢?
1条答案
按热度按时间drnojrws1#
这个
window
子句用于定义基于窗口函数的分组,例如Tumble
或者Session
. 表api(或sql)中没有很好地定义每5行分组一次,除非指定行的顺序。这是在on
合同条款Tumble
功能。由于此功能源自流处理,因此on
子句需要timestamp属性。您可以使用
currentTimestamp()
功能。但是,我应该指出,flink将对数据进行排序,因为它不知道函数的单调性。此外,所有这些都将使用1的并行性,因为没有允许分区的子句。或者,还可以实现一个用户定义的标量函数,将索引属性转换为时间戳(实际上是一个长值)。不过,Flink会做一个完整的数据排序。