pyspark 如何在Google Colab中显示流式计算?

rn0zuynd  于 2023-05-28  发布在  Spark
关注(0)|答案(1)|浏览(149)

我开始学习Pyspark。因此,在一个场景中,我正在测试是否可以使用Gdrive作为流数据的源。我将把csv文件一个接一个,代码将监视文件,并在此基础上产生聚合。
下面是我的代码:

from google.colab import drive
drive.mount('/content/drive')

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
from pyspark.sql.types import StructType,StructField,IntegerType,StringType
schema=StructType(
                [
                StructField('File',StringType(),True),
                StructField('Shop',StringType(),True),
                StructField('Sales',IntegerType(),True)
                ]
                )

df=spark.readStream.format("csv").schema(schema).option("header" ,True).option("sep",",").load("/content/drive/My Drive/Pyspark/")
df=df.groupby("shop").sum("sales")
# df.show()


df.writeStream.format("console").outputMode("update").start().awaitTermination()

我想在colab中显示聚合的输出。但是,它不显示任何输出。
有人能提出一些解决方案吗?

nr7wwzry

nr7wwzry1#

一种简单的方法是将结果输出到一个临时的内存中可查询表,在该表中可以更好地检查结果。
使用控制台输出,除了笔记本和其他终端缓冲输出问题之外,您还需要处理无法查询、过滤等的结果的复杂文本转储。
在你的情况下,它会是这样的:

query = df \
    .writeStream \ 
    .outputMode("update") \
    .queryName("aggregates") \
    .format("memory") \
    .start()

请注意,我已经删除了awaitTermination()方法,这样当Spark Streaming应用程序在后台运行时,您可以释放您的笔记本单元以在运行时执行分析。
然后你可以简单地查询aggregates表,如下所示:

spark.sql("select * from aggregates").show(truncate=False)

+-----+------------------+
|shop |sum(sales)        |
+-----+------------------+
|shop1|72312             |
|shop2|92333             |
|shop3|142313            |
|shop4|321123            |
+-----+------------------+

相关问题