我正在使用Flink 1.11,我有以下测试用例来尝试基于事件时间的间隔连接。
两个数据流的数据定义如下:
object JoinStockInterval {
//the stocks data,
//ts is the implicit method that converts the time string to timestamp
val stocks = Seq(
Stock("id1", "2020-09-16 20:50:15".ts, 1),
Stock("id1", "2020-09-16 20:50:12".ts, 2),
Stock("id1", "2020-09-16 20:50:18".ts, 4),
Stock("id1", "2020-09-16 20:50:11".ts, 3),
Stock("id1", "2020-09-16 20:50:11".ts, 10),
Stock("id1", "2020-09-16 20:50:13".ts, 5),
Stock("id1", "2020-09-16 20:50:20".ts, 6),
Stock("id1", "2020-09-16 20:50:14".ts, 7),
Stock("id1", "2020-09-16 20:50:22".ts, 8),
Stock("id1", "2020-09-16 20:50:40".ts, 9),
Stock("id1", "2020-09-16 20:50:15".ts, 100)
)
//Mock that the stock name is changing over time
val stockNameChangings = Seq(
StockNameChanging("id1", "Stock1", "2020-09-16 20:50:16".ts),
StockNameChanging("id1", "Stock101", "2020-09-16 20:50:20".ts),
StockNameChanging("id1", "Stock4", "2020-09-16 20:50:17".ts),
StockNameChanging("id1", "Stock7", "2020-09-16 20:50:21".ts),
StockNameChanging("id1", "Stock5", "2020-09-16 20:50:17".ts),
StockNameChanging("id1", "Stock501", "2020-09-16 20:50:22".ts),
StockNameChanging("id1", "Stock6", "2020-09-16 20:50:23".ts)
)
}
测试用例定义如下,每个用例允许延迟4秒:
test("test interval join inner 2 works") {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val ds1 = env.addSource(new IntervalJoinStockSource(emitInterval = 0)).assignTimestampsAndWatermarks(new StockWatermarkGenerator(4000)) //allow 4 secs lateness
val ds2 = env.addSource(new IntervalJoinStockNameChangingSource(emitInterval = 0)).assignTimestampsAndWatermarks(new StockNameChangingWatermarkGenerator(4000)) //allow 4 secs lateness
val tenv = StreamTableEnvironment.create(env)
tenv.createTemporaryView("s1", ds1, $"id", $"price", $"trade_date".rowtime() as "rt1")
tenv.createTemporaryView("s2", ds2, $"id", $"name", $"trade_date".rowtime() as "rt2")
tenv.from("s1").printSchema()
tenv.from("s2").printSchema()
val sql =
"""
select s1.id, s2.name, s1.price, cast (s1.rt1 as timestamp) as rt1, s2.rt2
from s1 join s2
on s1.id = s2.id
where s1.rt1 between s2.rt2 - interval '2' second and s2.rt2 + interval '2' second
""".stripMargin(' ')
tenv.sqlQuery(sql).toAppendStream[Row].print()
env.execute()
}
拼接结果如下:
id1,Stock1,1.0,2020-09-16T12:50:15,2020-09-16T12:50:16
id1,Stock1,4.0,2020-09-16T12:50:18,2020-09-16T12:50:16
id1,Stock101,4.0,2020-09-16T12:50:18,2020-09-16T12:50:20
id1,Stock4,4.0,2020-09-16T12:50:18,2020-09-16T12:50:17
id1,Stock4,1.0,2020-09-16T12:50:15,2020-09-16T12:50:17
id1,Stock5,4.0,2020-09-16T12:50:18,2020-09-16T12:50:17
id1,Stock5,1.0,2020-09-16T12:50:15,2020-09-16T12:50:17
id1,Stock101,6.0,2020-09-16T12:50:20,2020-09-16T12:50:20
id1,Stock7,6.0,2020-09-16T12:50:20,2020-09-16T12:50:21
id1,Stock501,6.0,2020-09-16T12:50:20,2020-09-16T12:50:22
id1,Stock1,7.0,2020-09-16T12:50:14,2020-09-16T12:50:16
id1,Stock101,8.0,2020-09-16T12:50:22,2020-09-16T12:50:20
id1,Stock501,8.0,2020-09-16T12:50:22,2020-09-16T12:50:22
id1,Stock7,8.0,2020-09-16T12:50:22,2020-09-16T12:50:21
id1,Stock6,8.0,2020-09-16T12:50:22,2020-09-16T12:50:23
id1,Stock1,100.0,2020-09-16T12:50:15,2020-09-16T12:50:16
id1,Stock4,100.0,2020-09-16T12:50:15,2020-09-16T12:50:17
id1,Stock5,100.0,2020-09-16T12:50:15,2020-09-16T12:50:17
上面结果中的最后一条记录很奇怪,它来自股票流中的Stock("id1", "2020-09-16 20:50:15".ts, 100)
,但是这条记录在股票流中已经晚了。请看股票流中的以下两条记录,我想问为什么这条记录没有被删除,而是成功地与另一个流(名称更改流)连接在一起。
Stock("id1", "2020-09-16 20:50:40".ts, 9),
Stock("id1", "2020-09-16 20:50:15".ts, 100)
水印策略使用AssignerWithPunctuatedWatermarks
1条答案
按热度按时间t3psigkw1#
你想知道的记录
从连接的Angular 来看并不晚。
其原因与水印在操作符具有多个输入的情况下如何传播有关(如区间连接),连接操作符的当前水印始终是从所有输入通道接收到的水印中最小的水印。
因此,在连接处理完此记录之前
连接处的水印由该记录确定
因此水印仍然福尔斯在为连接定义的间隔内。
水印以这种方式工作,因为它们表示一个Assert,即流现在可以被认为是完整的,直到水印的时间戳。从连接的Angular 来看,它只具有完整的知识,直到最后面的流的水印。