对于使用spark结构化流式处理的流,在过去的60分钟内保持包含前n条记录的更新表的最佳方法是什么?我只想每分钟更新一次我的表。我想我可以用 window
函数,将窗口持续时间设置为60分钟,将幻灯片持续时间设置为1分钟。问题是我得到了输出中所有滑动窗口的数据,但我只需要最后一个完全计算(关闭)的窗口。有办法做到这一点吗?或者我应该用另一种方式来解决这个问题(在过去的一个小时里保持几乎实时的排名)?
我的不完整解决方案:
val entityCounts = entities
.withWatermark("timestamp", "1 minute")
.groupBy(
window(col("timestamp"), "60 minutes", "1 minute")
.as("time_window"),
col("entity_type"),
col("entity"))
.count()
val query = entityCounts.writeStream
.foreachBatch( { (batchDF, _) =>
batchDF
.withColumn(
"row_number", row_number() over Window
.partitionBy("entity_type")
.orderBy(col("count").desc))
.filter(col("row_number") <= 5)
.select(
col("entity_type"),
col("entity"),
col("count").as("occurrences"))
.write
.cassandraFormat("table", "keyspace")
.mode("append")
.save
})
.outputMode("append")
.start()
暂无答案!
目前还没有任何答案,快来回答吧!