pyspark用于多个数据文件

xpcnnkqh  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(280)

我想用pyspark并行处理几个类似大小(100mb)的idependent csv文件。我在一台机器上运行pyspark:spark.driver.memory 2g spark.executor.memory 2g local[4]
文件内容:类型(在每个csv中具有相同的值)、时间戳、价格
首先我在一个csv上测试了它:

logData = spark.read.csv("TypeA.csv", header=False,schema=schema)
    // Compute moving avg.
    w = (Window.partitionBy("type").orderBy(f.col("timestamp").cast("long")).rangeBetween(-24*7*3600 * i, 0))
    logData = logData.withColumn("moving_avg", f.avg("price").over(w))
    // Some other simple operations... No Agg, no sort
    logData.write.parquet("res.pr")

这很有效。但是,当我尝试在23个文件上运行它时:

logData = spark.read.csv('TypeB*.csv', header=False,schema=schema)

作业失败,堆为空。将内存增加到10g会有帮助(5g和7g失败),但是它不能扩展,因为我需要处理600个文件。
问题是为什么pyspark不会溢出到磁盘以防止oom,或者我如何提示spark这样做?
我想另一种解决方案是按顺序读取文件,然后用spark逐个处理它们(但这可能会比较慢?)。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题