我有一个要求,在这里我需要运行spark.sql(“altertableaddpartition”)为众多的列,范围从3000到120000不等,这取决于我的负载,并行计数至少为20个线程。
我试着并行运行循环,如下所示:
(1 to 10).par.foreach( list1Ele =>{
(1 to 120).par.foreach( list2Ele => {
List("abc","def","efg").par.foreach( list3Ele => {
sparl.sql(s"alter table mySource.myTable drop if exists partition(list1Ele = $list1Ele, list2Ele = '$list2Ele', list3Ele='$list3Ele')")
})})})
每次迭代需要15分钟。
第二种方法,我试了一下,似乎是错的如下:
将所有400个alter table命令堆栈到列表(mylist)中,然后,
val myRDD = sc.parallelize(Seq(MyList))
myRDD.map(command => spark.sql(command)).collect
这也是行不通的。请求一些提示。提前谢谢!!
致以最诚挚的问候,
拉胡尔·辛格·巴贾杰
1条答案
按热度按时间li9yvcax1#
我尝试了一些选项,但解决我的问题的方法是使用spark.sql对新复制的数据进行msck修复
如果您有任何建议,我应该尝试并行运行spark.sql(“”)。我试过期货,但对我没用。