pyspark Azure-Databricks autoloader Binaryfile选项和foreach()给出java.lang.OutOfMemoryError:Java堆空间

myzjeezk  于 2023-03-28  发布在  Spark
关注(0)|答案(1)|浏览(110)

I am trying to do copying file from one location to another location using BinaryFile option and foreach(copy) in autoloader. It runs well with smaller files(upto 150 MB) but fails with bigger files throws below exception :

  • 22/09/07 10:25:51 INFO FileScanRDD: Reading File path: dbfs:/mnt/somefile.csv, range: 0-1652464461, partition values: [empty row], modificationTime: 1662542176000. 22/09/07 10:25:52 ERROR Utils: Uncaught exception in thread stdout writer for /databricks/python/bin/python java.lang.OutOfMemoryError: Java heap space at org.apache.spark.sql.catalyst.expressions.UnsafeRow.getBinary(UnsafeRow.java:416) at org.apache.spark.sql.catalyst.expressions.SpecializedGettersReader.read(SpecializedGettersReader.java:75) at org.apache.spark.sql.catalyst.expressions.UnsafeRow.get(UnsafeRow.java:333) at org.apache.spark.sql.execution.python.EvaluatePython$.toJava(EvaluatePython.scala:58) at org.apache.spark.sql.execution.python.PythonForeachWriter.$anonfun$inputByteIterator$1(PythonForeachWriter.scala:43) at org.apache.spark.sql.execution.python.PythonForeachWriter$$Lambda$1830/1643360976.apply(Unknown Source) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:92) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:82) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:82) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:442) at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:871) at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:573) at org.apache.spark.api.python.BasePythonRunner$WriterThread$$Lambda$2008/2134044540.apply(Unknown Source) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2275) at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:365) 22/09/07 10:25:52 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[stdout writer for /databricks/python/bin/python,5,main] java.lang.OutOfMemoryError: Java heap space at org.apache.spark.sql.catalyst.expressions.UnsafeRow.getBinary(UnsafeRow.java:416) at org.apache.spark.sql.catalyst.expressions.SpecializedGettersReader.read(SpecializedGettersReader.java:75) at org.apache.spark.sql.catalyst.expressions.UnsafeRow.get(UnsafeRow.java:333) at org.apache.spark.sql.execution.python.EvaluatePython$.toJava(EvaluatePython.scala:58) at org.apache.spark.sql.execution.python.PythonForeachWriter.$anonfun$inputByteIterator$1(PythonForeachWriter.scala:43) at org.apache.spark.sql.execution.python.PythonForeachWriter$$Lambda$1830/1643360976.apply(Unknown Source) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:92) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:82) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:82) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:442) at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:871) at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:573) at org.apache.spark.api.python.BasePythonRunner$WriterThread$$Lambda$2008/2134044540.apply(Unknown Source) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2275) at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:365)*

Below is the high-level code snippet for reference:

Cluster size is 2 workers and 1 driver with 14 Gb ram and 4 cores each

cloudfile_options = {
    "cloudFiles.subscriptionId":subscription_ID,
    "cloudFiles.connectionString": queue_SAS_connection_string,
    "cloudFiles.format": "BinaryFile", 
    "cloudFiles.tenantId":tenant_ID,
    "cloudFiles.clientId":client_ID,
    "cloudFiles.clientSecret":client_secret,
    "cloudFiles.useNotifications" :"true"
}

def copy(row):
    source = row['path']
    destination = "somewhere"
    shutil.copy(source,destination)

spark.readStream.format("cloudFiles")
                        .options(**cloudfile_options)
                        .load(storage_input_path)              
                        .writeStream
                        .foreach(copy)
                        .option("checkpointLocation", checkpoint_location)
                        .trigger(once=True)
                        .start()

I also tested shutil.copy with huge file sizes (20GB) outside foreach() and it works seemlessly.
Any leads on this would be much appreciated 😊

zfycwa2u

zfycwa2u1#

这是因为你传递了一整行,其中包含了应该从JVM序列化到Python的文件内容。如果你所做的一切只是复制文件,那么只需在.writeStream之前添加.select("path"),这样只有文件名会传递给Python,而没有内容:

相关问题