pyspark 从Azure Data Lake阅读Databricks中的parquet文件会导致java.lang.OutOfMemoryError:Java堆空间

jaql4c8m  于 2023-11-16  发布在  Spark
关注(0)|答案(1)|浏览(115)

我们有一个Azure Databricks Notebook,它正在从我们的“RAW”文件夹中阅读parquet数据文件,该文件夹已从Azure EventHub以Parquet格式流式传输到Azure Data Lake中,并按年/月/日分区,其中一天的文件夹可以包含10.000多个20 KB到6 MB的parquet文件。
示例文件路径:RAW/ConsumptionData/Parquet/V1/CapturedYear=2023/CapturedMonth=10/CapturedDay=30/137341728_872c9ee8bbaf4c58b8ee7a401f459b41_1.parquet
这已经工作了一段时间,但现在它已经开始崩溃,由于问题与

java.lang.OutOfMemoryError: Java heap space

字符串
因此,我们需要调整一些东西,无论是在代码或分区或集群规格。
笔记本由以下基本语句组成:

# Read the data with a defined schema
consFromRAW = ( spark
    .read
    .option("header","True")
    .schema(schema)
    .parquet( "/mnt/datalakegen2/RAW/ConsumptionData/Parquet/V1")
)


即使Spark是懒惰的,不应该在这条语句上执行任何东西,它也会花费很长时间,现在以内存错误结束,并且不会到达我们进行过滤的下一条语句:

# Filter the exact date partitions needed
consFromRAWFiltered = consFromRAW.filter(col("CapturedDate") >= firstRAWReadDate)


其中firstRAWReadDate通常是NOW()减去2天,CaptureDate是由CaptureYear、CaptureMonth和CaptureDay文件夹串联而成的日期。
关于集群的一些细节:
驱动程序:Standard_DS3_v2 Workers:Standard_DS3_v2 2-16 workers 10.4 LTS(包括Apache Spark 3.2.1,Scala 2.12)

Error details:

java.lang.OutOfMemoryError: Java heap space

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<command-3957504232682417> in <module>
     72 
     73     # Read the data with the defined schema
---> 74     consFromRAW = ( spark
     75         .read
     76         .option("header","True")

/databricks/spark/python/pyspark/sql/readwriter.py in parquet(self, *paths, **options)
    299                        int96RebaseMode=int96RebaseMode)
    300 
--> 301         return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
    302 
    303     def text(self, paths, wholetext=False, lineSep=None, pathGlobFilter=None,

/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    115     def deco(*a, **kw):
    116         try:
--> 117             return f(*a, **kw)
    118         except py4j.protocol.Py4JJavaError as e:
    119             converted = convert_exception(e.java_exception)

/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o533.parquet.
: java.lang.OutOfMemoryError: Java heap space
    at scala.collection.mutable.HashTable.resize(HashTable.scala:258)
    at scala.collection.mutable.HashTable.addEntry0(HashTable.scala:158)
    at scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:170)
    at scala.collection.mutable.HashTable.findOrAddEntry$(HashTable.scala:167)
    at scala.collection.mutable.LinkedHashSet.findOrAddEntry(LinkedHashSet.scala:44)
    at scala.collection.mutable.LinkedHashSet.add(LinkedHashSet.scala:68)
    at scala.collection.mutable.LinkedHashSet.$plus$eq(LinkedHashSet.scala:63)
    at scala.collection.mutable.LinkedHashSet.$plus$eq(LinkedHashSet.scala:44)
    at scala.collection.generic.Growable.$anonfun$$plus$plus$eq$1(Growable.scala:62)
    at scala.collection.generic.Growable$$Lambda$10/936292831.apply(Unknown Source)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
    at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
    at scala.collection.mutable.AbstractSet.$plus$plus$eq(Set.scala:50)
    at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.$anonfun$listLeafFiles$2(InMemoryFileIndex.scala:144)
    at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$Lambda$3969/1659440957.apply(Unknown Source)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:140)
    at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:102)
    at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:74)
    at org.apache.spark.sql.execution.datasources.DataSource.createInMemoryFileIndex(DataSource.scala:623)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:458)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:356)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:323)
    at org.apache.spark.sql.DataFrameReader$$Lambda$3794/1655878321.apply(Unknown Source)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:323)
    at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:740)

mzsu5hc0

mzsu5hc01#

您遇到的错误(java.lang.OutOfMemoryError: Java heap space)是由于试图将大量Parquet文件一次性读入DataFrame,这可能会导致内存问题。您可以使用Delta Lake,因为它旨在优化数据湖存储和处理。如果您的数据以Delta Lake格式存储,阅读可能更节省内存。您可以尝试使用format("delta")而不是parquet来阅读数据,代码如下:

path = "/mnt/files/RAW/"
df = spark.read.format("delta").load(path)  
df.show()

字符串
这将加载带有分区的整个Parquet文件,如下所示:


的数据

相关问题