我尝试使用AWS Big Data Blog article在AWS Glue Studio中创建作业,并使用pydeequ验证数据。
我成功地在作业中运行了pydeequ,但是当使用某些Check方法时,即使所有处理都已成功完成,作业仍在继续运行。
我检查了执行日志,发现像glue.LogPusher (Logging.scala:logInfo(57)): uploading /tmp/spark-event-logs/ to s3://bucket/sparkHistoryLogs/
这样的日志定期输出,似乎一个名为“LogPusher”的进程反复尝试将事件日志上传到s3。
有人知道是什么导致了这一事件,或者该怎么做吗?
以下是可重现上述问题的胶水作业代码。
# Language of job: Python
# Glue version: 3.0
# Deequ version: deequ-2.0.1-spark-3.2
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from awsglue.dynamicframe import DynamicFrame
from pydeequ.analyzers import (
AnalysisRunner,
AnalyzerContext,
Completeness,
Maximum,
MaxLength,
Minimum,
MinLength,
Size,
UniqueValueRatio,
)
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationResult, VerificationSuite
glue_context = GlueContext(SparkContext.getOrCreate())
spark = glue_context.spark_session
dyf = glue_context.create_dynamic_frame.from_options(
format_options={"quoteChar": '"', "withHeader": True, "separator": ","},
connection_type="s3",
format="csv",
connection_options={
"paths": [f"s3://bucket/test.csv"],
"recurse": True,
}
)
df = dyf.toDF()
df.show()
# output:
# +-------+-------+
# |column1|column2|
# +-------+-------+
# | a| 1|
# | b| 2|
# | c| 3|
# +-------+-------+
runner = AnalysisRunner(spark).onData(df)
runner.addAnalyzer(Size())
runner.addAnalyzer(Completeness("column1"))
runner.addAnalyzer(Completeness("column2"))
runner.addAnalyzer(UniqueValueRatio(["column1"]))
runner.addAnalyzer(UniqueValueRatio(["column2"]))
runner.addAnalyzer(MinLength("column2"))
runner.addAnalyzer(MaxLength("column2"))
runner.addAnalyzer(Minimum("column2"))
runner.addAnalyzer(Maximum("column2"))
result = runner.run()
result_df = AnalyzerContext.successMetricsAsDataFrame(spark, result)
result_df.show(truncate=False)
# output:
# +-------+--------+----------------+-----+
# |entity |instance|name |value|
# +-------+--------+----------------+-----+
# |Column |column2 |UniqueValueRatio|1.0 |
# |Dataset|* |Size |3.0 |
# |Column |column1 |Completeness |1.0 |
# |Column |column1 |UniqueValueRatio|1.0 |
# |Column |column2 |Completeness |1.0 |
# |Column |column2 |MinLength |1.0 |
# |Column |column2 |MaxLength |1.0 |
# +-------+--------+----------------+-----+
check_1 = Check(spark, CheckLevel.Warning, "isComplete").isComplete("column1")
result_1 = VerificationSuite(spark).onData(df).addCheck(check_1).run()
result_df_1 = VerificationResult.checkResultsAsDataFrame(spark, result_1)
result_df_1.show(truncate=False)
# output:
# +----------+-----------+------------+--------------------------------------------------+-----------------+------------------+
# |check |check_level|check_status|constraint |constraint_status|constraint_message|
# +----------+-----------+------------+--------------------------------------------------+-----------------+------------------+
# |isComplete|Warning |Success |CompletenessConstraint(Completeness(column1,None))|Success | |
# +----------+-----------+------------+--------------------------------------------------+-----------------+------------------+
# Up to this point, the job can be completed successfully.
check_2 = Check(spark, CheckLevel.Warning, "hasMinLength").hasMinLength("column1",lambda x: x >= 1)
result_2 = VerificationSuite(spark).onData(df).addCheck(check_2).run()
result_df_2 = VerificationResult.checkResultsAsDataFrame(spark, result_2)
result_df_2.show(truncate=False)
# output:
# +------------+-----------+------------+--------------------------------------------+-----------------+------------------+
# |check |check_level|check_status|constraint |constraint_status|constraint_message|
# +------------+-----------+------------+--------------------------------------------+-----------------+------------------+
# |hasMinLength|Warning |Success |MinLengthConstraint(MinLength(column1,None))|Success | |
# +------------+-----------+------------+--------------------------------------------+-----------------+------------------+
# When the above process is executed, the results are displayed normally, but the job is not finished forever.
1条答案
按热度按时间i7uq4tfw1#
你得关掉你的Spark会议: