我正在尝试用spark 2.3.0实现一个流连接玩具
当条件匹配时,流连接可以正常工作,但当条件不匹配时,即使使用leftouterjoin,也会丢失left stream值。
提前谢谢
这是我的源代码和数据,基本上,我正在创建两个套接字,一个是9999作为右流源,另一个是9998作为左流源。
val spark = SparkSession
.builder
.appName("StreamStream")
.master("local")
.getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("ERROR")
val s9999: DataFrame = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
val s9999Dataset: Dataset[S9999] = s9999
.map(line => {
val strings = line.get(0).toString.split(",")
val id = strings(0).toInt
val time = Timestamp.valueOf(strings(1))
S9999(id, time)
})
.withWatermark("timestamp99", "30 seconds")
val s9998Dataset: Dataset[S9998] = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9998)
.load()
.map(line => {
val strings = line.get(0).toString.split(",")
val id = strings(0).toInt
val time = Timestamp.valueOf(strings(1))
S9998(id, time)
})
val resultDataset = s9998Dataset
.join(s9999Dataset,
joinExprs = expr(
"""
id99 = id98 AND
timestamp98 >= timestamp99 AND
timestamp98 <= timestamp99 + interval 6 seconds
"""),
joinType = "leftOuter")
val streamingQuery = resultDataset
.writeStream
.outputMode("append")
.format("console")
.start()
streamingQuery.awaitTermination()
}
case class S9999(id99: Int, timestamp99: Timestamp)
case class S9998(id98: Int, timestamp98: Timestamp)
样本数据:
左插座:
1,2011-10-02 18:50:20.123
2,2011-10-02 18:50:25.123
3,2011-10-02 18:50:30.123
4,2011-10-02 18:50:35.123
5,2011-10-02 18:50:40.123
6,2011-10-02 18:50:45.123
7,2011-10-02 18:50:50.123
8,2011-10-02 18:50:55.123
9,2011-10-02 18:51:00.123
10,2011-10-02 18:51:05.123
11,2011-10-02 18:51:10.123
12,2011-10-02 18:51:15.123
13,2011-10-02 18:51:20.123
14,2011-10-02 18:51:25.123
15,2011-10-02 18:51:30.123
右流数据:
1,2011-10-02 18:50:20.123
3,2011-10-02 18:50:30.123
7,2011-10-02 18:50:50.123
8,2011-10-02 18:50:55.123
9,2011-10-02 18:51:00.123
13,2011-10-02 18:51:20.123
14,2011-10-02 18:51:25.123
15,2011-10-02 18:51:30.123
1条答案
按热度按时间fv2wmkja1#
在这个问题上花了6个小时之后,我发现左边的可选水印实际上是强制性的