我一直在尝试将两个大型表从oracledb复制到hdfs中,我使用pyspark和jdbc从源代码读取表,并将表保存为hive分区表。
我已经成功地将这些表复制并保存到hdfs中:直接从jdbc读取到hive分区表。
这种方法的问题在于,它在hdfs中的每个分区上创建了大量的小文件。因此,为了避免这种情况,我尝试在写入hdfs之前对从jdbc读取的数据进行重新分区,如下所示:
partition_cols = ["col1", "col2"]
df = spark.read \
.format( "jdbc" ) \
.option( "url", jdbc_url ) \
.option( "dbtable", "(SELECT * FROM table) T" ) \
.option( "driver", "oracle.jdbc.driver.OracleDriver" ) \
.option( "user", "user" ) \
.option( "password", "password" ) \
.option( "numPartitions", 128 ) \
.option( "fetchsize", 32000) \
.option( "partitionColumn", key_col ) \
.option( "lowerBound", min_val) \
.option( "upperBound", max_val ) \
.load()
df = df.repartition( *partition_cols )
df.write.mode( "overwrite" ).format( "parquet" ).partitionBy( *partition_cols ).saveAsTable( "some_table" )
当我运行它时,得到以下错误消息:
org.apache.spark.shuffle.FetchFailedException: Read error or truncated source
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:554)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:470)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1363)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Read error or truncated source
at com.github.luben.zstd.ZstdInputStream.read(ZstdInputStream.java:102)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at java.io.FilterInputStream.read(FilterInputStream.java:107)
at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:364)
at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:351)
at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:351)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1363)
at org.apache.spark.util.Utils$.copyStream(Utils.scala:372)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:462)
... 26 more
任何关于为什么会发生这种错误的想法都是受欢迎的。到目前为止,我还没有找到任何关于这个问题的有用信息。
spark版本2.4.0
jdbc8型
python 2.7版
Hive2.1.1
hadoop 3.0.0版
暂无答案!
目前还没有任何答案,快来回答吧!