在spark结构化流媒体中,如何比较静态Dataframe和流式Dataframe?

s3fp2yjn  于 2021-05-18  发布在  Spark
关注(0)|答案(1)|浏览(479)

我需要比较两个Dataframe。其中一个是静态的,另一个是流式的。示例静态Dataframe如下所示:

id, value
2786,  5
7252,  3
2525,  4
8038,  1

流式Dataframe示例如下所示:

id, value
2786,  9
7252,  8
2525,  7

结果Dataframe应如下所示:

id, value
8038, 1

价值根本不重要。我只需要找到,对于这个小批量,我没有指定id为8038的值。我尝试使用joins和subtract()函数,但问题是流-静态连接不支持我需要的连接类型,当静态Dataframe位于左侧时,subtract不起作用。例如,这些表达式将返回错误:

staticDF.subtract(streamingDF)
staticDF.join(streamingDF, staticDF.id = streamingDF.id, "left_anti")

有没有什么方法可以得到staticdf中的id,而不是spark structured streaming中的streamingdf中的id?

bvhaajcl

bvhaajcl1#

您可以使用foreachbatch sink,然后对静态Dataframe和微批处理使用left-anti-join。

streamingDf.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  println("------------------------")
  println("Batch "+batchId+ " data")
  println("Total Records " + batchDF.count())
  println("------------------------")
  staticDf.join(batchDF, staticDf("id") === batchDF("id"),"left_anti")
    .select(staticDf("*")).show()

//You can also write your output using any writer
//e.g. df.write.format("csv").save("src/test/resources")

}.start()

输入:

static df
+----+-----+
|  id|value|
+----+-----+
|2786|    5|
|7252|    3|
|2525|    4|
|8038|    1|
+----+-----+

streaming batch 0
2786,9
7252,8
2525,7

streaming batch 1
2786,9
7252,8

输出:

------------------------
Batch 0 data
Total Records 3
------------------------
+----+-----+
|  id|value|
+----+-----+
|8038|    1|
+----+-----+

------------------------
Batch 1 data
Total Records 2
------------------------
+----+-----+
|  id|value|
+----+-----+
|2525|    4|
|8038|    1|
+----+-----+

相关问题