将大型表从oracle导入hdfs时出错:“org.apache.spark.shuffle.fetchfailedexception:读取错误或截断源”

mccptt67  于 2021-06-24  发布在  Hive
关注(0)|答案(0)|浏览(302)

我一直在尝试将两个大型表从oracledb复制到hdfs中,我使用pyspark和jdbc从源代码读取表,并将表保存为hive分区表。
我已经成功地将这些表复制并保存到hdfs中:直接从jdbc读取到hive分区表。
这种方法的问题在于,它在hdfs中的每个分区上创建了大量的小文件。因此,为了避免这种情况,我尝试在写入hdfs之前对从jdbc读取的数据进行重新分区,如下所示:

partition_cols = ["col1", "col2"]
df = spark.read \
    .format( "jdbc" ) \
    .option( "url", jdbc_url  ) \
    .option( "dbtable", "(SELECT * FROM table) T" ) \
    .option( "driver", "oracle.jdbc.driver.OracleDriver" ) \
    .option( "user", "user" ) \
    .option( "password", "password" ) \
    .option( "numPartitions", 128 ) \
    .option( "fetchsize", 32000) \
    .option( "partitionColumn",  key_col ) \
    .option( "lowerBound", min_val) \
    .option( "upperBound", max_val ) \
    .load()
df = df.repartition( *partition_cols )
df.write.mode( "overwrite" ).format( "parquet" ).partitionBy( *partition_cols ).saveAsTable( "some_table" )

当我运行它时,得到以下错误消息:

org.apache.spark.shuffle.FetchFailedException: Read error or truncated source
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:554)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:470)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624)
    at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
    at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
    at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1363)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Read error or truncated source
    at com.github.luben.zstd.ZstdInputStream.read(ZstdInputStream.java:102)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
    at java.io.FilterInputStream.read(FilterInputStream.java:107)
    at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:364)
    at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:351)
    at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:351)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1363)
    at org.apache.spark.util.Utils$.copyStream(Utils.scala:372)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:462)
    ... 26 more

任何关于为什么会发生这种错误的想法都是受欢迎的。到目前为止,我还没有找到任何关于这个问题的有用信息。
spark版本2.4.0
jdbc8型
python 2.7版
Hive2.1.1
hadoop 3.0.0版

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题