我们有一个Azure Databricks Notebook,它正在从我们的“RAW”文件夹中阅读parquet数据文件,该文件夹已从Azure EventHub以Parquet格式流式传输到Azure Data Lake中,并按年/月/日分区,其中一天的文件夹可以包含10.000多个20 KB到6 MB的parquet文件。
示例文件路径:RAW/ConsumptionData/Parquet/V1/CapturedYear=2023/CapturedMonth=10/CapturedDay=30/137341728_872c9ee8bbaf4c58b8ee7a401f459b41_1.parquet
这已经工作了一段时间,但现在它已经开始崩溃,由于问题与
java.lang.OutOfMemoryError: Java heap space
字符串
因此,我们需要调整一些东西,无论是在代码或分区或集群规格。
笔记本由以下基本语句组成:
# Read the data with a defined schema
consFromRAW = ( spark
.read
.option("header","True")
.schema(schema)
.parquet( "/mnt/datalakegen2/RAW/ConsumptionData/Parquet/V1")
)
型
即使Spark是懒惰的,不应该在这条语句上执行任何东西,它也会花费很长时间,现在以内存错误结束,并且不会到达我们进行过滤的下一条语句:
# Filter the exact date partitions needed
consFromRAWFiltered = consFromRAW.filter(col("CapturedDate") >= firstRAWReadDate)
型
其中firstRAWReadDate通常是NOW()减去2天,CaptureDate是由CaptureYear、CaptureMonth和CaptureDay文件夹串联而成的日期。
关于集群的一些细节:
驱动程序:Standard_DS3_v2 Workers:Standard_DS3_v2 2-16 workers 10.4 LTS(包括Apache Spark 3.2.1,Scala 2.12)
Error details:
java.lang.OutOfMemoryError: Java heap space
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<command-3957504232682417> in <module>
72
73 # Read the data with the defined schema
---> 74 consFromRAW = ( spark
75 .read
76 .option("header","True")
/databricks/spark/python/pyspark/sql/readwriter.py in parquet(self, *paths, **options)
299 int96RebaseMode=int96RebaseMode)
300
--> 301 return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
302
303 def text(self, paths, wholetext=False, lineSep=None, pathGlobFilter=None,
/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
115 def deco(*a, **kw):
116 try:
--> 117 return f(*a, **kw)
118 except py4j.protocol.Py4JJavaError as e:
119 converted = convert_exception(e.java_exception)
/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o533.parquet.
: java.lang.OutOfMemoryError: Java heap space
at scala.collection.mutable.HashTable.resize(HashTable.scala:258)
at scala.collection.mutable.HashTable.addEntry0(HashTable.scala:158)
at scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:170)
at scala.collection.mutable.HashTable.findOrAddEntry$(HashTable.scala:167)
at scala.collection.mutable.LinkedHashSet.findOrAddEntry(LinkedHashSet.scala:44)
at scala.collection.mutable.LinkedHashSet.add(LinkedHashSet.scala:68)
at scala.collection.mutable.LinkedHashSet.$plus$eq(LinkedHashSet.scala:63)
at scala.collection.mutable.LinkedHashSet.$plus$eq(LinkedHashSet.scala:44)
at scala.collection.generic.Growable.$anonfun$$plus$plus$eq$1(Growable.scala:62)
at scala.collection.generic.Growable$$Lambda$10/936292831.apply(Unknown Source)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.AbstractSet.$plus$plus$eq(Set.scala:50)
at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.$anonfun$listLeafFiles$2(InMemoryFileIndex.scala:144)
at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$Lambda$3969/1659440957.apply(Unknown Source)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:140)
at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:102)
at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:74)
at org.apache.spark.sql.execution.datasources.DataSource.createInMemoryFileIndex(DataSource.scala:623)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:458)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:356)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:323)
at org.apache.spark.sql.DataFrameReader$$Lambda$3794/1655878321.apply(Unknown Source)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:323)
at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:740)
型
1条答案
按热度按时间mzsu5hc01#
您遇到的错误(
java.lang.OutOfMemoryError: Java heap space
)是由于试图将大量Parquet文件一次性读入DataFrame,这可能会导致内存问题。您可以使用Delta Lake,因为它旨在优化数据湖存储和处理。如果您的数据以Delta Lake格式存储,阅读可能更节省内存。您可以尝试使用format("delta")
而不是parquet
来阅读数据,代码如下:字符串
这将加载带有分区的整个Parquet文件,如下所示:
的数据