pyspark 在Databricks中为参数化笔记本创建模式字典

ikfrs5lh  于 2023-01-16  发布在  Spark
关注(0)|答案(1)|浏览(108)

我在Databricks中有一个笔记本,如下所示。

from pyspark.sql import functions as F

# Define the input path. Files looks like COMPANYX_20220113.csv
input_path = '/mnt/stackoverflow/source/COMPANYX*.csv'

# Define the output path
output_path = '/mnt/stackoverflow/raw/COMPANYX'

# Read in the CSV file
raw_df = spark.read.csv(path=input_path, schema=schema, sep=';', header=False, inferSchema=False)

# Write the DataFrame in the delta format - one time operation, commented out after first run
filtered_df.write.format("delta").mode("append").save(output_path)

# Create a delta table - one time operation, commented out after first run
spark.sql(f"CREATE TABLE IF NOT EXISTS stackoverflow.RAW_COMPANYX USING DELTA LOCATION '{output_path}'")

# Create temporary view to use as source table in the merge
filtered_df.createOrReplaceTempView("new_rows")

# Save the temporary table in the delta table using merge logic
spark.sql(f"MERGE INTO stackoverflow.RAW_COMPANYX d \
          USING new_rows s \
          ON d.DATUM = s.DATUM \
          AND d.FORNR = s.FORNR \
          AND d.AVD = s.AVD \
          AND d.KBESTNR = s.KBESTNR \
          AND d.VAGNNR = s.VAGNNR \
          WHEN MATCHED THEN UPDATE SET * \
          WHEN NOT MATCHED THEN INSERT * \
          ")

我的问题是:这个笔记本应该参数化可以在source中登陆的不同csv文件。COMPANYX、COMPANYY和COMPANYZ都在这里登陆它们的csv文件,但是它们都有不同的模式。
对于schema=schema,我想知道如何旋转从动态路径读取csv文件时使用的模式。
我想创建一个模式字典,并根据调用笔记本时从ADF发送的参数获取正确的键:值对。
你将如何着手做这件事?对以上内容还有其他反馈吗?
注意:我在上面使用pyspark.sql.functions的脚本中排除了一些转换。

eqqqjvef

eqqqjvef1#

1.在数据块中创建小部件并从ADF读取数据。
1.创建一个python函数,如下所示,从计划声明的模式字典中分配模式。

def check_file_name(input_path):
    if input_path.split(".")[0].endswidth("X"):
       schema = assign_x_schema
    elif input_path.split(".")[0].endswidth("Y"):
       schema = assign_y_schema
    elif input_path.split(".")[0].endswidth("Z"):
       schema = assign_Z_schema
    return schema

schema = check_file_name(input_path)

可能有更好的办法,我在想这个办法。
希望这对你有帮助!!

相关问题