我在Databricks中有一个笔记本,如下所示。
from pyspark.sql import functions as F
# Define the input path. Files looks like COMPANYX_20220113.csv
input_path = '/mnt/stackoverflow/source/COMPANYX*.csv'
# Define the output path
output_path = '/mnt/stackoverflow/raw/COMPANYX'
# Read in the CSV file
raw_df = spark.read.csv(path=input_path, schema=schema, sep=';', header=False, inferSchema=False)
# Write the DataFrame in the delta format - one time operation, commented out after first run
filtered_df.write.format("delta").mode("append").save(output_path)
# Create a delta table - one time operation, commented out after first run
spark.sql(f"CREATE TABLE IF NOT EXISTS stackoverflow.RAW_COMPANYX USING DELTA LOCATION '{output_path}'")
# Create temporary view to use as source table in the merge
filtered_df.createOrReplaceTempView("new_rows")
# Save the temporary table in the delta table using merge logic
spark.sql(f"MERGE INTO stackoverflow.RAW_COMPANYX d \
USING new_rows s \
ON d.DATUM = s.DATUM \
AND d.FORNR = s.FORNR \
AND d.AVD = s.AVD \
AND d.KBESTNR = s.KBESTNR \
AND d.VAGNNR = s.VAGNNR \
WHEN MATCHED THEN UPDATE SET * \
WHEN NOT MATCHED THEN INSERT * \
")
我的问题是:这个笔记本应该参数化可以在source
中登陆的不同csv文件。COMPANYX、COMPANYY和COMPANYZ都在这里登陆它们的csv文件,但是它们都有不同的模式。
对于schema=schema
,我想知道如何旋转从动态路径读取csv文件时使用的模式。
我想创建一个模式字典,并根据调用笔记本时从ADF发送的参数获取正确的键:值对。
你将如何着手做这件事?对以上内容还有其他反馈吗?
注意:我在上面使用pyspark.sql.functions
的脚本中排除了一些转换。
1条答案
按热度按时间eqqqjvef1#
1.在数据块中创建小部件并从ADF读取数据。
1.创建一个python函数,如下所示,从计划声明的模式字典中分配模式。
可能有更好的办法,我在想这个办法。
希望这对你有帮助!!