spark 2.3

aoyhnmkz  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(219)

我有 hive 外桌 MyTablepartition(load_date:String, load_type:String) 作为Parquet地板储存。功能 loadData(start,end) 是一个具有spark操作的 Package 函数 df.insertinto() 最后。使用输入作为调用此函数 loadData(20190101,20190103) 将在hdfs目录结构下创建

load_date=2019101/type=A
load_date=2019101/type=B
load_date=2019102/type=A
load_date=2019102/type=B
load_date=2019103/type=A
load_date=2019103/type=B

def loadData(start,end):
       .... 
       // dynamic insert for start to end date range
       df.write.mode(SaveMode.Overwrite).insertInto(MyTable) // for type A
       ....
       df.write.mode(SaveMode.Overwrite).insertInto(MyTable) // for Type B

注2 insertinto() 在上面的函数中,由于某些原因按顺序运行。在一个spark应用程序中,我旋转了多个spark作业,这些作业写入 Mytable 但每个作业都会写入一个单独的分区,比如-

val interval= [ (20190101,20190105), (20190106,20190110), (20190111,20190115), .....]

interval.grouped(3).foreach(grp => grp.par.foreach(slot => loadData(slot._1,slot._2))

3个作业按预期并行触发,应用程序成功完成。但是在输出hdfs位置,我看到一些随机分区丢失了。
例如, type=B 内部缺少分区 load_date=20191010120190105 ```
load_date=20190101/type=A
load_date=20190102/type=A
load_date=20190103/type=A
load_date=20190104/type=A
load_date=20190105/type=A
load_date=20190106/type=A
load_date=20190107/type=A
load_date=20190108/type=A
load_date=20190109/type=A
load_date=201901010/type=A
load_date=20190106/type=B
load_date=20190107/type=B
load_date=20190108/type=B
load_date=20190109/type=B
load_date=201901010/type=B

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题