我正在尝试合并大约150个表,每个表有大约3000列和350行。
列不完全匹配,因此我编写了下面的代码来对齐表:
def spark_union(df_list):
# Create a list of all the column names and sort them
cols = set()
for df in df_list:
cols.update(df.columns)
cols = sorted(cols)
# Add missing columns to each df
for i in range(len(df_list)):
missing = set(cols) - set(df_list[i].columns)
select_cols = [c if c not in missing else F.lit(None).alias(c) for c in cols]
df_list[i] = df_list[i].select(select_cols) # columns in the same order
return reduce(DataFrame.unionAll, df_list)
不幸的是,这需要很长的时间来运行(最终的reduce行需要15个多小时),考虑到数据的大小,这似乎是不正确的。
暂无答案!
目前还没有任何答案,快来回答吧!