我想创建一个SQLContext的 Package 器,它应该使下面的sql方法(sqlc.sql)在失败之前运行查询最多30次。对于每次失败的重试,Spark上下文应该关闭并重新启动。
from pyspark.sql import SQLContext
from pyspark import SparkContext
sc=SparkContext()
sqlc=SQLContext()
sqlc.sql("select * from table").write.format("parquet").mode("overwrite").saveAsTable("new_table")
目前我正在做如下
from pyspark.sql import SQLContext
from pyspark import SparkContext
global sc
global sqlc
sc=SparkContext()
sqlc=SQLContext()
for i in range(1,4):
try:
sqlc.sql("select * from table").write.format("parquet").mode("overwrite").saveAsTable("new_table")
break
exception e:
sc.stop()
sc=SparkContext()
sqlc=SQLContext(sc)
由于我需要将这些更改应用到数百个python脚本中,所以我希望在调用sqlc.sql方法时在内部应用此逻辑。
1条答案
按热度按时间drnojrws1#
最简单的方法可能是编写一个装饰器,类似于:
话虽如此:可能不建议每次都重新启动SparkContext,最好尝试找出查询失败的原因并解决根本原因,而不是重新启动上下文。
第二个注意事项:还有一个python包"retry",它可以做很多开箱即用的高级事情,请参见here
希望这能帮上忙