我想运行一个带有水印的流结构,但似乎不起作用:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming._
val in=spark.readStream.schema("id INT,time TIMESTAMP").csv("data/sensor/*.txt").withWatermark("time","1 seconds")
in.createOrReplaceTempView("in")
val in2=spark.sql("select id,window,count(1),collect_list(concat(time)) from in group by id,window(time,'5 second') order by window,id")
val writer=in2.writeStream.format("console").option("truncate","false").outputMode("complete").trigger(Trigger.ProcessingTime("1 second")).option("checkpointLocation","chktmp5")
val stream=writer.start
我使用带有csv的文件夹作为流媒体源,我将在特殊处理时间生成csv,计划如下:
eventtime processtime
8:57:01 8:57:11
8:57:03 8:57:13
8:57:01 8:57:25
结束后,您可以将所有处理时间视为文件时间:
me@ubuntu:~/Downloads/spark-3.0.1-bin-hadoop2.7/bin/data/sensor$ ls --full-time
total 24
-rw-rw-r-- 1 yz yz 22 2021-03-25 08:57:11.811041590 -0700 1.txt
-rw-rw-r-- 1 yz yz 22 2021-03-25 08:57:13.811052616 -0700 2.txt
-rw-rw-r-- 1 yz yz 22 2021-03-25 08:57:25.815118132 -0700 3.txt
1.txt包含:
3,2021-03-25 08:57:01
2.txt包含:
2,2021-03-25 08:57:03
3.txt包含:
3,2021-03-25 08:57:01
显然,事件时间为08:57:01,处理时间为08:57:25的事件太晚了,应该放弃,但实际上没有,因为我使用的是控制台接收器,我可以看到:
-------------------------------------------
Batch: 2
-------------------------------------------
+---+------------------------------------------+--------+------------------------------------------+
|id |window |count(1)|collect_list(concat(CAST(time AS STRING)))|
+---+------------------------------------------+--------+------------------------------------------+
|2 |[2021-03-25 08:57:00, 2021-03-25 08:57:05]|1 |[2021-03-25 08:57:03] |
|3 |[2021-03-25 08:57:00, 2021-03-25 08:57:05]|2 |[2021-03-25 08:57:01,2021-03-25 08:57:01] |
+---+------------------------------------------+--------+------------------------------------------+
所以我觉得水印不起作用,为什么?
暂无答案!
目前还没有任何答案,快来回答吧!