pyspark SQL上下文 Package 器

fiei3ece  于 2023-02-15  发布在  Spark
关注(0)|答案(1)|浏览(176)

我想创建一个SQLContext的 Package 器,它应该使下面的sql方法(sqlc.sql)在失败之前运行查询最多30次。对于每次失败的重试,Spark上下文应该关闭并重新启动。

from pyspark.sql import SQLContext
from pyspark import SparkContext

sc=SparkContext()
sqlc=SQLContext()
sqlc.sql("select * from table").write.format("parquet").mode("overwrite").saveAsTable("new_table")

目前我正在做如下

from pyspark.sql import SQLContext
from pyspark import SparkContext

global sc
global sqlc

sc=SparkContext()
sqlc=SQLContext()

for i in range(1,4):
     try:
          sqlc.sql("select * from table").write.format("parquet").mode("overwrite").saveAsTable("new_table")
          break
     exception e:
          sc.stop()
          sc=SparkContext()
          sqlc=SQLContext(sc)

由于我需要将这些更改应用到数百个python脚本中,所以我希望在调用sqlc.sql方法时在内部应用此逻辑。

drnojrws

drnojrws1#

最简单的方法可能是编写一个装饰器,类似于:

import logging
import time

# create a decorator that will retry a function if it fails
def retry_on_exception(max_retries=3, retry_interval=1):
    def wrapper(func):
        def wrapped(*args, **kwargs):
            retries = 0
            while retries < max_retries:
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    retries += 1
                    logging.error(f"SQL query failed with error: {e}. Retrying (attempt {retries}/{max_retries})...")
                    # You can restart SparkContext here if you want
                    time.sleep(retry_interval)
            raise Exception(f"SQL query failed after {max_retries} attempts.")
        return wrapped
    return wrapper

# use the retry wrapper with the query function
@retry_on_exception(max_retries=2, retry_interval=1)
def execute_sql_query(query):
    # Code to execute the SQL query goes here
    # raise Exception("SQL query failed.") # example of failure
    return "this" # example of success

# call the query function
execute_sql_query("SELECT * FROM table")

话虽如此:可能不建议每次都重新启动SparkContext,最好尝试找出查询失败的原因并解决根本原因,而不是重新启动上下文。
第二个注意事项:还有一个python包"retry",它可以做很多开箱即用的高级事情,请参见here
希望这能帮上忙

相关问题