pyspark 在粘合作业中使用pydeequ进行验证将阻止作业完成

qxsslcnc  于 2023-03-11  发布在  Spark
关注(0)|答案(1)|浏览(127)

我尝试使用AWS Big Data Blog article在AWS Glue Studio中创建作业,并使用pydeequ验证数据。
我成功地在作业中运行了pydeequ,但是当使用某些Check方法时,即使所有处理都已成功完成,作业仍在继续运行。
我检查了执行日志,发现像glue.LogPusher (Logging.scala:logInfo(57)): uploading /tmp/spark-event-logs/ to s3://bucket/sparkHistoryLogs/这样的日志定期输出,似乎一个名为“LogPusher”的进程反复尝试将事件日志上传到s3。
有人知道是什么导致了这一事件,或者该怎么做吗?
以下是可重现上述问题的胶水作业代码。

# Language of job: Python
# Glue version: 3.0
# Deequ version: deequ-2.0.1-spark-3.2

from awsglue.context import GlueContext
from pyspark.context import SparkContext
from awsglue.dynamicframe import DynamicFrame
from pydeequ.analyzers import (
    AnalysisRunner,
    AnalyzerContext,
    Completeness,
    Maximum,
    MaxLength,
    Minimum,
    MinLength,
    Size,
    UniqueValueRatio,
)
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationResult, VerificationSuite

glue_context = GlueContext(SparkContext.getOrCreate())
spark = glue_context.spark_session

dyf = glue_context.create_dynamic_frame.from_options(
    format_options={"quoteChar": '"', "withHeader": True, "separator": ","},
    connection_type="s3",
    format="csv",
    connection_options={
        "paths": [f"s3://bucket/test.csv"],
        "recurse": True,
    }
)

df = dyf.toDF()
df.show()

# output:
# +-------+-------+
# |column1|column2|
# +-------+-------+
# |      a|      1|
# |      b|      2|
# |      c|      3|
# +-------+-------+

runner = AnalysisRunner(spark).onData(df)

runner.addAnalyzer(Size())
runner.addAnalyzer(Completeness("column1"))
runner.addAnalyzer(Completeness("column2"))
runner.addAnalyzer(UniqueValueRatio(["column1"]))
runner.addAnalyzer(UniqueValueRatio(["column2"]))
runner.addAnalyzer(MinLength("column2"))
runner.addAnalyzer(MaxLength("column2"))
runner.addAnalyzer(Minimum("column2"))
runner.addAnalyzer(Maximum("column2"))

result = runner.run()
result_df = AnalyzerContext.successMetricsAsDataFrame(spark, result)
result_df.show(truncate=False)

# output:
# +-------+--------+----------------+-----+
# |entity |instance|name            |value|
# +-------+--------+----------------+-----+
# |Column |column2 |UniqueValueRatio|1.0  |
# |Dataset|*       |Size            |3.0  |
# |Column |column1 |Completeness    |1.0  |
# |Column |column1 |UniqueValueRatio|1.0  |
# |Column |column2 |Completeness    |1.0  |
# |Column |column2 |MinLength       |1.0  |
# |Column |column2 |MaxLength       |1.0  |
# +-------+--------+----------------+-----+

check_1 = Check(spark, CheckLevel.Warning, "isComplete").isComplete("column1")
result_1 = VerificationSuite(spark).onData(df).addCheck(check_1).run()
result_df_1 = VerificationResult.checkResultsAsDataFrame(spark, result_1)
result_df_1.show(truncate=False)

# output:
# +----------+-----------+------------+--------------------------------------------------+-----------------+------------------+
# |check     |check_level|check_status|constraint                                        |constraint_status|constraint_message|
# +----------+-----------+------------+--------------------------------------------------+-----------------+------------------+
# |isComplete|Warning    |Success     |CompletenessConstraint(Completeness(column1,None))|Success          |                  |
# +----------+-----------+------------+--------------------------------------------------+-----------------+------------------+

# Up to this point, the job can be completed successfully.

check_2 = Check(spark, CheckLevel.Warning, "hasMinLength").hasMinLength("column1",lambda x: x >= 1)
result_2 = VerificationSuite(spark).onData(df).addCheck(check_2).run()
result_df_2 = VerificationResult.checkResultsAsDataFrame(spark, result_2)
result_df_2.show(truncate=False)

# output:
# +------------+-----------+------------+--------------------------------------------+-----------------+------------------+
# |check       |check_level|check_status|constraint                                  |constraint_status|constraint_message|
# +------------+-----------+------------+--------------------------------------------+-----------------+------------------+
# |hasMinLength|Warning    |Success     |MinLengthConstraint(MinLength(column1,None))|Success          |                  |
# +------------+-----------+------------+--------------------------------------------+-----------------+------------------+

# When the above process is executed, the results are displayed normally, but the job is not finished forever.
i7uq4tfw

i7uq4tfw1#

你得关掉你的Spark会议:

spark.sparkContext._gateway.shutdown_callback_server()
spark.stop()

相关问题