pyspark 如何使用笔记本将数据从Azure Data Lake加载到Microsoft Lakehouse?

zhte4eai  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(137)

我的情况是这样的:
我在Azure帐户中有一个存储,其中包含来自Dynamics 365 F&O的表,并且我有一个JSON文件,其中包含列的名称和类型。这是“头”文件,我有另一个CSV文件(可以是1个或多个CSV到同一个表)的数据。
所以,我需要合并这2个为每一张table,然后加载到我的织物湖屋。到目前为止,我正在尝试使用以下代码:

import json
import os

def get_cdm_files(directory_path):
    cdm_files = []

    for root, diers, files in os.walk(directory_path):
        for file in files:
            if file.endswith('.cdm.json'):
                cdm_files.append(os.path.join(root, file))
  
    return cdm_files

def load_table_cdm_file(cdm_file_path):
    with open(cdm_file_path.replace("abfss://[email protected]/", "/dbfs/mnt/dynamics/")) as f:
        cdm_json = json.load(f)

    colss = []
    for item in cdm_json['definitions'][0]['hasAttributes']:
        colss.append(item["name"])
  
    return spark.read.csv(cdm_file_path.replace("cdm.json", ".csv"), header=False, inferSchema=True)

def load_all_tables(cdm_files):
    tables = {}

    for cdm_file in cdm_files:
        table_name = cdm_file.split("/")[-1].replace(".cdm.json", "").lower()
        tables[table_name] = load_table_cdm_file(cdm_file)
    
    return tables

def write_table_delta(table_name, table_df):
    spark.sql(f"DROP TABLE IF EXISTS Lakehousename.Dynamics365_{table_name}")
    table_df.write.mode("overwrite").format("delta").saveAsTable(f"Dynamics365_{table_name}")

def main():
    cdm_files = get_cdm_files("abfss://[email protected]/domainname.operations.dynamics.com/Tables/")
    
    if "TABLENAME1.cdm.json" in cdm_files:
        cdm_files.remove("abfss://[email protected]/domainname.operations.dynamics.com/Tables/Custom/TABLENAME1.cdm.json")
    
    if "TABLENAME2.cdm.json" in cdm_files:
        cdm_files.remove("abfss://[email protected]/domainname.operations.dynamics.com/Tables/Custom/TABLENAME2.cdm.json")
    
    if "TABLE3.cdm.json" in cdm_files:
        cdm_files.remove("abfss://[email protected]/domainname.operations.dynamics.com/Tables/Custom/TABLE3.cdm.json")

    tables = load_all_tables(cdm_files)

    for table_name, table_df in tables.items():
        write_table_delta(table_name, table_df)

我试着寻找指南,但因为这是一个新事物,没有太多的搜索,甚至人工智能可以帮助。

5kgi1eie

5kgi1eie1#

按如下方式更改每个函数。

获取cdm文件

import json
import os

def get_cdm_files(directory_path):
    cdm_files = []

    for root, diers, files in os.walk(directory_path.replace("abfss://[email protected]/", "/dbfs/mnt/dynamics/")):
        for file in files:
            if file.endswith('.cdm.json'):
                cdm_files.append(os.path.join(root.replace( "/dbfs/mnt/dynamics/","abfss://[email protected]/"), file))
  
    return cdm_files

将获得.cdm.json文件。

接下来,

load_table_cdm_file

用于使用json文件中的模式阅读csv文件。

from pyspark.sql.types import StructField,StructType,StringType,DoubleType,IntegerType

data_type_mapping = {
        "string": StringType(),
        "integer": IntegerType(),
        "double": DoubleType(),
        # Add more mappings as needed
    }

def load_table_cdm_file(cdm_file_path):
    with open(cdm_file_path.replace("abfss://[email protected]/", "/dbfs/mnt/dynamics/")) as f:
        cdm_json = json.load(f)

    columns = [StructField(item["name"], data_type_mapping.get(item["type"],StringType()), True) for item in cdm_json['definitions'][0]['hasAttributes']]
    schema = StructType(columns)
    print(schema)
    df = spark.read.format("csv").schema(schema).load(cdm_file_path.replace("cdm.json", "csv"), header=False)
    return df

load_all_tableskeep中没有任何变化。现在将表写入lakehouse,如果你在lakehouse中使用notebook,write_table_delta函数可以正常工作。

如果你在数据库中使用notebook,请使用下面的代码编写。在运行此代码之前,请确保选中高级选项下的标记为用户级数据访问启用凭据透传

abfss路径复制到lakehouse表。

转到表的属性并复制路径,它类似于下面的一个。
abfss://<kjfneldqw>@msit-onelake.dfs.fabric.microsoft.com/<6382ey398e>/Tables

write_table_delta

lakehouse_table_path="abfss_path" #your_abfss_path_to_lakehouse_table
def write_table_delta(table_name, table_df):
    table_df.write.mode("overwrite").format("delta").save(f"{lakehouse_table_path}/Dynamics365_{table_name}")

现在运行你的主代码。

cdm_files = get_cdm_files("abfss://[email protected]/domainname.operations.dynamics.com/Tables/")
print(cdm_files)
tables = load_all_tables(cdm_files)
print(tables)
for table_name, table_df in tables.items():
    write_table_delta(table_name, table_df)

输出量:

在Lakehouse。

同样,你可以使用spark.read.format("delta").load("abfss_path")提供lakehouse abfss表路径来读取这个表。

相关问题