我有一个大的csv文件与相同的模式一串。我想合并这些文件,并将结果写入一个按列划分的Parquet文件 file_name
.
以下是我迄今为止所做的:
children_files = hdfs.list("/landing/my_data/children_flow/")
df = spark.createDataFrame(
spark.sparkContext.emptyRDD(),
dfSchema
)
df = df.withColumn("file_name", lit(None))
for one_file in children_files :
df2 = spark.read.csv(os.path.join("/landing/my_data/children_flow/", one_file))
df2 = df2.withColumn("file_name", lit(one_file.replace(".csv", "")))
df = df.union(df2)
df.write.partitionBy("file_name").parquet("/staging/my_data/children_flow/")
问题是我越来越 java.lang.OutOfMemoryError: Java heap space
错误。
我试着做一个
df.repartition(3000, "file_name").write.partitionBy("file_name").parquet("/staging/my_data/children_flow/")
但也不管用。你能提出一个解决办法吗?
1条答案
按热度按时间xmjla07d1#
看起来hdfs位置中的文件太多了。因为有那么多文件
union
作为一个转变而不是一个行动的Spark,在试图建立物理计划时可能会耗尽记忆。在任何情况下,如果您计划读取大量具有相同模式的csv文件,我将使用结构化流。为了避免手动编写模式,可以从示例文件中推断出来。
下面的代码显示了这个想法:
如果内存再次耗尽,可以将选项“maxfilespertrigger”设置为一个合适的数字。如果它创建了太多的Parquet文件,您最终可以只读取->重新分区->写入一次Parquet数据。