pyspark 无法将窗口化查询流数据接收到MongoDB

jaql4c8m  于 2023-03-01  发布在  Spark
关注(0)|答案(1)|浏览(89)

使用Spark Structured Streaming我尝试将流数据导入MongoDB集合。问题是我使用窗口查询数据,如下所示:

def basicAverage(df): 
  return df.groupby(window(col('timestamp'), "1 hour", "5 minutes"), col('stationcode')) \
  .agg(avg('mechanical').alias('avg_mechanical'), avg('ebike').alias('avg_ebike'),  avg('numdocksavailable').alias('avg_numdocksavailable'))

mongodb集成似乎不支持包含窗口数据的writeStream,因为当我运行脚本时,我的集合保持为空,没有显示任何错误。我试图删除查询中的窗口选项,接收器工作得很好。
下面是我的sink方法:

queryBasicAvg.writeStream.format('mongodb').queryName("basicAvg") \
  .option("checkpointLocation", "./tmp/pyspark7/").option("forceDeleteTempCheckpointLocation", "true") \
  .option('spark.mongodb.connection.uri', 'mongodb://127.0.0.1') \
  .option("spark.mongodb.database", 'velibprj').option("spark.mongodb.collection", 'stationsBasicAvg') \
  .outputMode("append").start()

有没有想过如何解决这个问题?
先谢了

4ioopgfo

4ioopgfo1#

我使用foreach()sink方法解决了这个问题。

相关问题