pyspark 在数据块自动加载器中处理被覆盖的文件

lp0sw83n  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(161)

主题

我正面临一个我一直在努力解决的问题:
摄取已被Autoloader捕获但被新数据覆盖的文件。

详细的问题描述

我有一个登陆文件夹在一个数据湖,每天都有一个新的文件被张贴。你可以检查下面的图像示例:

自动化每天都会发布一个带有新数据的文件。该文件的名称带有后缀,表示当前发布期间的年和月。
此命名惯例会产生每天都会以当月的累积数据撷取覆写的档案。文件夹中的档案数目只会在当月结束而新月份开始时增加。
为了解决这个问题,我使用Databricks的Autoloader特性实现了以下PySpark代码:


# Import functions

from pyspark.sql.functions import input_file_name, current_timestamp, col

# Define variables used in code below

checkpoint_directory = "abfss://gpdi-files@hgbsprodgbsflastorage01.dfs.core.windows.net/RAW/Test/_checkpoint/sapex_ap_posted"
data_source = f"abfss://gpdi-files@hgbsprodgbsflastorage01.dfs.core.windows.net/RAW/Test"
source_format = "csv"
table_name = "prod_gbs_gpdi.bronze_data.sapex_ap_posted"

# Configure Auto Loader to ingest csv data to a Delta table

query = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", source_format)
    .option("cloudFiles.schemaLocation", checkpoint_directory)
    .option("header", "true")
    .option("delimiter", ";")
    .option("skipRows", 7)
    .option("modifiedAfter", "2022-10-15 11:34:00.000000 UTC-3")  # To ingest files that have a modification timestamp after the provided timestamp.
    .option("pathGlobFilter", "AP_SAPEX_KPI_001 - Posted Invoices in *.CSV")  # A potential glob pattern to provide for choosing files.
    .load(data_source)
    .select(
        "*", 
        current_timestamp().alias("_JOB_UPDATED_TIME"), 
        input_file_name().alias("_JOB_SOURCE_FILE"), 
        col("_metadata.file_modification_time").alias("_MODIFICATION_TIME")
    )
    .writeStream
    .option("checkpointLocation", checkpoint_directory)
    .option("mergeSchema", "true")
    .trigger(availableNow=True)
    .toTable(table_name)
)

这段代码允许我捕获每个新文件,并将其摄取到一个原始表中。
问题是,只有当新文件到达时,它才能正常工作。但是,如果所需的文件在着陆文件夹中被覆盖,Autoloader什么也不做,因为它假设文件已经被摄取,即使文件的修改时间已经改变。

暂时失败

我尝试在代码中使用modifiedAfter选项。但它似乎只是作为一个过滤器来防止带有时间戳的文件被摄取,如果它的属性早于时间戳字符串中提到的阈值。它不重新摄取时间戳早于modifiedAfter阈值的文件。

.option("modifiedAfter", "2022-10-15 14:10:00.000000 UTC-3")

问题

是否有人知道如何检测已被摄取但修改日期不同的文件,以及如何重新处理该文件以将其加载到表中?

2izufjch

2izufjch1#

我已经找到了这个问题的解决方案。在Databricks文档的Autoloader选项列表中,可以看到一个名为cloudFiles.allowOverwrites的选项。如果在流查询中启用该选项,则每当文件在湖中被覆盖时,该查询都会将其接收到目标表中。请注意,每当新文件被覆盖时,该选项可能会复制数据。因此,下游处理将是必要的。

相关问题