如何将微批量数据聚合成一个Dataframe,用于spark结构化流媒体?

xyhw6mcr  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(359)

用例如下。基于spark结构化流,Kafka数据被摄取。我们希望每10秒一次的微批数据都能被处理并聚合到一个Dataframe中,这个Dataframe不断地监视每个id的一些值的总和。下面的方法正确吗?似乎每个查询的“监视表”中仍然有以前微批中的旧数据,这导致了不需要的聚合。解决这个问题的最好办法是什么?谢谢!

val monitoring_stream = df.writeStream
                              .outputMode("append")
                              .format("memory")
                              .queryName("monitoring_table")
                              .start()

      var rounds = 0

      while(monitoring_stream.isActive) {
          Thread.sleep(10000)

          spark.sql("SELECT * from monitoring_table").show()   
          var tempDF = spark.sql("SELECT * from monitoring_table")
          var batchDF_group = tempDF.withWatermark("timestamp", "10 seconds").groupBy("id").sum("download_volume", "upload_volume").withColumnRenamed("sum(download_volume)","total_download_volume_batch").withColumnRenamed("sum(upload_volume)","total_upload_volume_batch")
          monitoring_df = monitoring_df.join(batchDF_group, monitoring_df("id") === batchDF_group("id"), "left").select(monitoring_df("id"), monitoring_df("total_download_volume"), monitoring_df("upload_volume"), monitoring_df("total_volume"), batchDF_group("total_download_volume_batch"), batchDF_group("total_upload_volume_batch")).na.fill(0)
          monitoring_df = monitoring_df.withColumn("total_upload_volume", monitoring_df("total_upload_volume")+monitoring_df("total_upload_volume_batch"))
          monitoring_df = monitoring_df.withColumn("total_download_volume", monitoring_df("total_download_volume")+monitoring_df("total_download_volume_batch"))
          monitoring_df = monitoring_df.withColumn("total_volume", monitoring_df("total_download_volume")+monitoring_df("total_upload_volume"))                                    

          monitoring_df.show()    
    }```
cngwdvgl

cngwdvgl1#

我使用databricks已经有一段时间了,在流媒体方面对我有帮助的一件事是-
databricks三角洲湖
使databricks delta lake与此相关的特性是——它对多个流(或并发批处理作业)保持“精确一次”处理。
您可以阅读上面的文档,如果没有databricks delta lake,那么您可以在您的项目中包含其中的一些功能。

相关问题