我的情况是这样的:
我在Azure帐户中有一个存储,其中包含来自Dynamics 365 F&O的表,并且我有一个JSON文件,其中包含列的名称和类型。这是“头”文件,我有另一个CSV文件(可以是1个或多个CSV到同一个表)的数据。
所以,我需要合并这2个为每一张table,然后加载到我的织物湖屋。到目前为止,我正在尝试使用以下代码:
import json
import os
def get_cdm_files(directory_path):
cdm_files = []
for root, diers, files in os.walk(directory_path):
for file in files:
if file.endswith('.cdm.json'):
cdm_files.append(os.path.join(root, file))
return cdm_files
def load_table_cdm_file(cdm_file_path):
with open(cdm_file_path.replace("abfss://[email protected]/", "/dbfs/mnt/dynamics/")) as f:
cdm_json = json.load(f)
colss = []
for item in cdm_json['definitions'][0]['hasAttributes']:
colss.append(item["name"])
return spark.read.csv(cdm_file_path.replace("cdm.json", ".csv"), header=False, inferSchema=True)
def load_all_tables(cdm_files):
tables = {}
for cdm_file in cdm_files:
table_name = cdm_file.split("/")[-1].replace(".cdm.json", "").lower()
tables[table_name] = load_table_cdm_file(cdm_file)
return tables
def write_table_delta(table_name, table_df):
spark.sql(f"DROP TABLE IF EXISTS Lakehousename.Dynamics365_{table_name}")
table_df.write.mode("overwrite").format("delta").saveAsTable(f"Dynamics365_{table_name}")
def main():
cdm_files = get_cdm_files("abfss://[email protected]/domainname.operations.dynamics.com/Tables/")
if "TABLENAME1.cdm.json" in cdm_files:
cdm_files.remove("abfss://[email protected]/domainname.operations.dynamics.com/Tables/Custom/TABLENAME1.cdm.json")
if "TABLENAME2.cdm.json" in cdm_files:
cdm_files.remove("abfss://[email protected]/domainname.operations.dynamics.com/Tables/Custom/TABLENAME2.cdm.json")
if "TABLE3.cdm.json" in cdm_files:
cdm_files.remove("abfss://[email protected]/domainname.operations.dynamics.com/Tables/Custom/TABLE3.cdm.json")
tables = load_all_tables(cdm_files)
for table_name, table_df in tables.items():
write_table_delta(table_name, table_df)
我试着寻找指南,但因为这是一个新事物,没有太多的搜索,甚至人工智能可以帮助。
1条答案
按热度按时间5kgi1eie1#
按如下方式更改每个函数。
获取cdm文件
将获得
.cdm.json
文件。接下来,
load_table_cdm_file
用于使用json文件中的模式阅读csv文件。
在load_all_tableskeep中没有任何变化。现在将表写入lakehouse,如果你在lakehouse中使用notebook,write_table_delta函数可以正常工作。
或
如果你在数据库中使用notebook,请使用下面的代码编写。在运行此代码之前,请确保选中高级选项下的标记为用户级数据访问启用凭据透传。
将
abfss
路径复制到lakehouse表。转到表的属性并复制路径,它类似于下面的一个。
abfss://<kjfneldqw>@msit-onelake.dfs.fabric.microsoft.com/<6382ey398e>/Tables
write_table_delta
现在运行你的主代码。
输出量:
在Lakehouse。
同样,你可以使用
spark.read.format("delta").load("abfss_path")
提供lakehouse abfss表路径来读取这个表。