主题
我正面临一个我一直在努力解决的问题:
摄取已被Autoloader捕获但被新数据覆盖的文件。
详细的问题描述
我有一个登陆文件夹在一个数据湖,每天都有一个新的文件被张贴。你可以检查下面的图像示例:
自动化每天都会发布一个带有新数据的文件。该文件的名称带有后缀,表示当前发布期间的年和月。
此命名惯例会产生每天都会以当月的累积数据撷取覆写的档案。文件夹中的档案数目只会在当月结束而新月份开始时增加。
为了解决这个问题,我使用Databricks的Autoloader特性实现了以下PySpark代码:
# Import functions
from pyspark.sql.functions import input_file_name, current_timestamp, col
# Define variables used in code below
checkpoint_directory = "abfss://gpdi-files@hgbsprodgbsflastorage01.dfs.core.windows.net/RAW/Test/_checkpoint/sapex_ap_posted"
data_source = f"abfss://gpdi-files@hgbsprodgbsflastorage01.dfs.core.windows.net/RAW/Test"
source_format = "csv"
table_name = "prod_gbs_gpdi.bronze_data.sapex_ap_posted"
# Configure Auto Loader to ingest csv data to a Delta table
query = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", source_format)
.option("cloudFiles.schemaLocation", checkpoint_directory)
.option("header", "true")
.option("delimiter", ";")
.option("skipRows", 7)
.option("modifiedAfter", "2022-10-15 11:34:00.000000 UTC-3") # To ingest files that have a modification timestamp after the provided timestamp.
.option("pathGlobFilter", "AP_SAPEX_KPI_001 - Posted Invoices in *.CSV") # A potential glob pattern to provide for choosing files.
.load(data_source)
.select(
"*",
current_timestamp().alias("_JOB_UPDATED_TIME"),
input_file_name().alias("_JOB_SOURCE_FILE"),
col("_metadata.file_modification_time").alias("_MODIFICATION_TIME")
)
.writeStream
.option("checkpointLocation", checkpoint_directory)
.option("mergeSchema", "true")
.trigger(availableNow=True)
.toTable(table_name)
)
这段代码允许我捕获每个新文件,并将其摄取到一个原始表中。
问题是,只有当新文件到达时,它才能正常工作。但是,如果所需的文件在着陆文件夹中被覆盖,Autoloader什么也不做,因为它假设文件已经被摄取,即使文件的修改时间已经改变。
暂时失败
我尝试在代码中使用modifiedAfter
选项。但它似乎只是作为一个过滤器来防止带有时间戳的文件被摄取,如果它的属性早于时间戳字符串中提到的阈值。它不重新摄取时间戳早于modifiedAfter
阈值的文件。
.option("modifiedAfter", "2022-10-15 14:10:00.000000 UTC-3")
问题
是否有人知道如何检测已被摄取但修改日期不同的文件,以及如何重新处理该文件以将其加载到表中?
1条答案
按热度按时间2izufjch1#
我已经找到了这个问题的解决方案。在Databricks文档的Autoloader选项列表中,可以看到一个名为
cloudFiles.allowOverwrites
的选项。如果在流查询中启用该选项,则每当文件在湖中被覆盖时,该查询都会将其接收到目标表中。请注意,每当新文件被覆盖时,该选项可能会复制数据。因此,下游处理将是必要的。