我正在尝试将windowfunction用于datastream,我的目标是实现如下查询
SELECT *,
count(id) OVER(PARTITION BY country) AS c_country,
count(id) OVER(PARTITION BY city) AS c_city,
count(id) OVER(PARTITION BY city) AS c_addrs
FROM fm
ORDER BY country
帮助我按国家字段进行聚合,但我需要在同一时间窗口中按两个字段进行聚合。我不知道在keyby()中是否可以有两个或更多的键
val parsed = stream2.map(x=> {
val arr = x.split(",")
(arr(0).toInt, arr(1), arr(2))
})
parsed
.keyBy(x => x._2)
.window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
.process(new ProcessWindowFunction[
(Int, String, String), (Int, String, String, Int), String, TimeWindow
]() {
override def process(key: String, context: Context,
elements: Iterable[(Int, String, String)],
out: Collector[(Int, String, String, Int)]): Unit = {
val lst = elements.toList
lst.foreach(x => out.collect((x._1, x._2, x._3, lst.size)))
}
}).print().setParallelism(1)
这对于第一次聚合很好,但是我错过了同一时间窗口中城市字段的第二次聚合。
输入数据:
10,"SPAIN","BARCELONA","C1"
20,"SPAIN","BARCELONA","C2"
30,"SPAIN","MADRID","C3"
30,"SPAIN","MADRID","C3"
80,"SPAIN","MADRID","C4"
90,"SPAIN","VALENCIA","C5"
40,"ITALY","ROMA","C6"
41,"ITALY","ROMA","C7"
42,"ITALY","VENECIA","C8"
50,"FRANCE","PARIS","C9"
60,"FRANCE","PARIS","C9"
70,"FRANCE","MARSELLA","C10"
预期产量
(10,"SPAIN","BARCELONA",6,2,1)
(20,"SPAIN","BARCELONA",6,2,1)
(30,"SPAIN","MADRID",6,3,2)
(30,"SPAIN","MADRID",6,3,2)
(80,"SPAIN","MADRID",6,3,1)
(90,"SPAIN","VALENCIA",6,1,1)
(50,"FRANCE","PARIS",3,2,1)
(60,"FRANCE","PARIS",3,2,1)
(70,"FRANCE","MARSELLA",3,1,1)
(40,"ITALY","ROMA",3,2,2)
(41,"ITALY","ROMA",3,2,2)
(42,"ITALY","VENECIA",3,1,1)
3条答案
按热度按时间hfyxw5xn1#
作为
city
是的子类别country
,您可以通过city
维度,然后通过country
尺寸。如果一个维度不是另一个维度的子维度,您可以将这两个维度合并并生成一个新的键,然后自己在process func中实现聚合逻辑。
更新
我认为在一个进程函数中计算结果是不可能的,因为您试图用不同的键进行统计。一步完成的唯一方法是将全局并行度设置为1(即使使用
keyby
func)或将输入数据广播到所有下游任务。因为您的计算实际上有一些通用的过程逻辑,所以最好进行一些抽象。
顺便说一句,我不确定输出是否真的符合您的期望。由于您没有实现有状态的流程函数,我认为您正在尝试计算每批数据的聚合结果,并且每批数据都包含在1秒的时间窗口中接收的数据。输出不会一直累积,每批都会从零开始。
通过使用
timeWindow
函数,您还需要注意TimeCharacteristic
默认情况下是处理时间。输出也可能因为使用3个结果而延迟
window
功能。假设第一个进程func在一秒钟内完成了聚合,并将结果转发到下游。作为第二个进程,func还有一个timewindow
1秒,它将不会发出任何结果,直到它收到来自上游的下一批输出。让我们看看别人是否有更好的办法来解决你的问题。
zyfwsgd62#
我现在想对3列进行聚合。如果我使用的选项是链接keyby()输出,但这可能会变得非常长和复杂,并且不太可读。除此之外,我还设置了一个time.seconds(1)的时间窗口,因为如果没有这个窗口,上面的keyby()输出将作为单个事件。
我感兴趣的是,我是否可以在一个进程函数中进行这些聚合。
我有那么长的密码。。。
yiytaume3#
------更新2