pyspark 如何在Spark SQL上处理AnalysisException?

xvw2m8pv  于 2023-01-29  发布在  Spark
关注(0)|答案(4)|浏览(385)

我试图在Spark中执行一个查询列表,但是如果查询没有正确运行,Spark就会抛出以下错误:分析异常:"不支持ALTER TABLE CHANGE COLUMN来更改...
这是我的代码的一部分(我正在使用Python和Spark SQL on Databricks):

for index, row in df_tables.iterrows():
  query = row["query"]
  print ("Executing query: ")
  try:
      spark.sql(query)
      print ("Query executed")
  except (ValueError, RuntimeError, TypeError, NameError):
      print("Unable to process your query dude!!")
  else:
      #do another thing

有什么方法可以捕捉到这个异常吗?ValueError,RuntimeError,TypeError,NameError似乎不起作用。在Spark的网页上没有太多关于这个的信息。

n6lpvg4x

n6lpvg4x1#

我发现在pyspark.sql.utils. https://spark.apache.org/docs/3.0.1/api/python/_modules/pyspark/sql/utils.html中定义了分析异常

import pyspark.sql.utils
try:
    spark.sql(query)
    print ("Query executed")
except pyspark.sql.utils.AnalysisException:
    print("Unable to process your query dude!!")
bq9c1y66

bq9c1y662#

可以按如下所示修改try except语句:

try:
  spark.sql(query)
  print ("Query executed")
except Exception as x:
  print("Unable to process your query dude!!" + \
        "\n" + "ERROR : " + str(x))
ep6jt1vc

ep6jt1vc3#

我认为这取决于你的需求。如果你在这个查询上运行完整的工作流,如果你想让它们通过,那么你的代码会工作得很好。但是假设你想让你的工作流或数据管道失败,那么你应该退出那个except块。
您可能无法获得确切的异常,但您肯定可以使用

except Exception as x:
  print(str(x))

您可以使用日志记录模块将更多信息放入日志中,以便进一步调查。

xxe27gdn

xxe27gdn4#

我想提出一种方法来选择特定的异常。我遇到了一个问题,即查找某个表是否已经存在。我找到的最简单的方法是这样的。当然,如果Spark维护人员更改异常的消息,这可能会中断,但我认为他们没有理由这样做,在这种情况下。

import pyspark.sql.utils

    try:
        spark.read.parquet(SOMEPATH)
    except pyspark.sql.utils.AnalysisException as e:
        if "Path does not exist:" in str(e):
            # Finding specific message of Exception.
            pass # run some code to address this specific case.
        else:
            # if this is not the AnalysisException that i was waiting,
            # i throw again the exception
            raise (e)
    except Exception as e:
        # if is another exception i can catch like this
        print(e)
        raise (e)

相关问题