pyspark 在Spark中将 Dataframe 保存为 parquet 时发生缓冲区/容量错误

wkyowqbh  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(267)

我在使用PySpark将 Dataframe 作为分区的parquet文件写入Hadoop时遇到了问题。
这样做效果很好:

salesDfSpark.write.option("header",True) \
        .partitionBy("Country") \
        .mode("overwrite") \
        .csv("hdfs://master:9000/sales/{}_{}.csv".format(csvName,epochNow)) #Hadoop Namenode at port 9000
print("Sales Dataframe stored in Hadoop.")

这是行不通的:

salesDfSpark.write.option("header",True) \
        .partitionBy("Country") \
        .mode("overwrite") \
        .parquet("hdfs://master:9000/sales/{}_{}.parquet".format(csvName,epochNow)) #Hadoop Namenode at port 9000
print("Sales Dataframe stored in Hadoop.")

使用openjdk 8时出错:

Caused by: java.lang.IllegalArgumentException
    at java.nio.Buffer.limit(Buffer.java:275)
    at org.xerial.snappy.Snappy.compress(Snappy.java:156)

使用openjdk 11时出错:

Caused by: java.lang.IllegalArgumentException: newLimit > capacity: (80 > 76)
    at java.base/java.nio.Buffer.createLimitException(Buffer.java:372)

除了改变java版本之外,我已经测试了snappy-java-1.1.8.4和snappy-java-1.1.4,结果都是一样的。有人对这个问题有经验吗?
编辑:
这会产生相同的错误:

salesDfSpark.write.saveAsTable('salesDfSpark')

输出量:

2022-06-12 10:35:29,468 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 7.0 (TID 1184) (worker2 executor 0): org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:500)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:321)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$16(FileFormatWriter.scala:229)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: newLimit > capacity: (144 > 112)
    at java.base/java.nio.Buffer.createLimitException(Buffer.java:372)
    at java.base/java.nio.Buffer.limit(Buffer.java:346)
    at java.base/java.nio.ByteBuffer.limit(ByteBuffer.java:1107)
    at java.base/java.nio.MappedByteBuffer.limit(MappedByteBuffer.java:235)
    at java.base/java.nio.MappedByteBuffer.limit(MappedByteBuffer.java:67)
    at org.xerial.snappy.Snappy.compress(Snappy.java:156)
    at org.apache.parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:78)
    at org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81)
    at org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92)
    at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:167)
    at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:168)
    at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:59)
    at org.apache.parquet.column.impl.ColumnWriterBase.writePage(ColumnWriterBase.java:387)
    at org.apache.parquet.column.impl.ColumnWriteStoreBase.flush(ColumnWriteStoreBase.java:186)
    at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:29)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:185)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:124)
    at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:164)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:41)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseCurrentWriter(FileFormatDataWriter.scala:64)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:75)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:105)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:305)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1496)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:311)
    ... 9 more
gwo2fgha

gwo2fgha1#

我也有同样的错误。解决方案和错误消息太不相关,但以下工作对我来说。
在我的例子中,java临时目录“/tmp”安装到服务器上,并带有noexec标志。snappy-java库无法在临时目录中提取和执行其本机库,并由于临时目录权限而抛出上述堆栈跟踪。
解决方案是通过使用**"-Dorg.xerial.snappy.tempdir”**配置属性提供另一个具有执行权限的目录。

spark-submit --conf "spark.driver.extraJavaOptions=-Dorg.xerial.snappy.tempdir=/my_user/temp_folder" my_app.py

还可以使用以下命令检查Noexec标志:

findmnt -l

相关问题