我在使用PySpark将 Dataframe 作为分区的parquet文件写入Hadoop时遇到了问题。
这样做效果很好:
salesDfSpark.write.option("header",True) \
.partitionBy("Country") \
.mode("overwrite") \
.csv("hdfs://master:9000/sales/{}_{}.csv".format(csvName,epochNow)) #Hadoop Namenode at port 9000
print("Sales Dataframe stored in Hadoop.")
这是行不通的:
salesDfSpark.write.option("header",True) \
.partitionBy("Country") \
.mode("overwrite") \
.parquet("hdfs://master:9000/sales/{}_{}.parquet".format(csvName,epochNow)) #Hadoop Namenode at port 9000
print("Sales Dataframe stored in Hadoop.")
使用openjdk 8时出错:
Caused by: java.lang.IllegalArgumentException
at java.nio.Buffer.limit(Buffer.java:275)
at org.xerial.snappy.Snappy.compress(Snappy.java:156)
使用openjdk 11时出错:
Caused by: java.lang.IllegalArgumentException: newLimit > capacity: (80 > 76)
at java.base/java.nio.Buffer.createLimitException(Buffer.java:372)
除了改变java版本之外,我已经测试了snappy-java-1.1.8.4和snappy-java-1.1.4,结果都是一样的。有人对这个问题有经验吗?
编辑:
这会产生相同的错误:
salesDfSpark.write.saveAsTable('salesDfSpark')
输出量:
2022-06-12 10:35:29,468 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 7.0 (TID 1184) (worker2 executor 0): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:500)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:321)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$16(FileFormatWriter.scala:229)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: newLimit > capacity: (144 > 112)
at java.base/java.nio.Buffer.createLimitException(Buffer.java:372)
at java.base/java.nio.Buffer.limit(Buffer.java:346)
at java.base/java.nio.ByteBuffer.limit(ByteBuffer.java:1107)
at java.base/java.nio.MappedByteBuffer.limit(MappedByteBuffer.java:235)
at java.base/java.nio.MappedByteBuffer.limit(MappedByteBuffer.java:67)
at org.xerial.snappy.Snappy.compress(Snappy.java:156)
at org.apache.parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:78)
at org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81)
at org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92)
at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:167)
at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:168)
at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:59)
at org.apache.parquet.column.impl.ColumnWriterBase.writePage(ColumnWriterBase.java:387)
at org.apache.parquet.column.impl.ColumnWriteStoreBase.flush(ColumnWriteStoreBase.java:186)
at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:29)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:185)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:124)
at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:164)
at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:41)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseCurrentWriter(FileFormatDataWriter.scala:64)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:75)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:105)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:305)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1496)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:311)
... 9 more
1条答案
按热度按时间gwo2fgha1#
我也有同样的错误。解决方案和错误消息太不相关,但以下工作对我来说。
在我的例子中,java临时目录“/tmp”安装到服务器上,并带有noexec标志。snappy-java库无法在临时目录中提取和执行其本机库,并由于临时目录权限而抛出上述堆栈跟踪。
解决方案是通过使用**"-Dorg.xerial.snappy.tempdir”**配置属性提供另一个具有执行权限的目录。
还可以使用以下命令检查Noexec标志: