from pyspark.sql.functions import lit
def __order_df_and_add_missing_cols(df, columns_order_list, df_missing_fields):
""" return ordered dataFrame by the columns order list with null in missing columns """
if not df_missing_fields: # no missing fields for the df
return df.select(columns_order_list)
else:
columns = []
for colName in columns_order_list:
if colName not in df_missing_fields:
columns.append(colName)
else:
columns.append(lit(None).alias(colName))
return df.select(columns)
def __add_missing_columns(df, missing_column_names):
""" Add missing columns as null in the end of the columns list """
list_missing_columns = []
for col in missing_column_names:
list_missing_columns.append(lit(None).alias(col))
return df.select(df.schema.names + list_missing_columns)
def __order_and_union_d_fs(left_df, right_df, left_list_miss_cols, right_list_miss_cols):
""" return union of data frames with ordered columns by left_df. """
left_df_all_cols = __add_missing_columns(left_df, left_list_miss_cols)
right_df_all_cols = __order_df_and_add_missing_cols(right_df, left_df_all_cols.schema.names,
right_list_miss_cols)
return left_df_all_cols.union(right_df_all_cols)
def union_d_fs(left_df, right_df):
""" Union between two dataFrames, if there is a gap of column fields,
it will append all missing columns as nulls """
# Check for None input
if left_df is None:
raise ValueError('left_df parameter should not be None')
if right_df is None:
raise ValueError('right_df parameter should not be None')
# For data frames with equal columns and order- regular union
if left_df.schema.names == right_df.schema.names:
return left_df.union(right_df)
else: # Different columns
# Save dataFrame columns name list as set
left_df_col_list = set(left_df.schema.names)
right_df_col_list = set(right_df.schema.names)
# Diff columns between left_df and right_df
right_list_miss_cols = list(left_df_col_list - right_df_col_list)
left_list_miss_cols = list(right_df_col_list - left_df_col_list)
return __order_and_union_d_fs(left_df, right_df, left_list_miss_cols, right_list_miss_cols)
修改了albertobonsanto的版本以保留原始列顺序(op暗示顺序应该与原始表匹配)。此外,还有 match 部分导致intellij警告。 以下是我的版本:
def unionDifferentTables(df1: DataFrame, df2: DataFrame): DataFrame = {
val cols1 = df1.columns.toSet
val cols2 = df2.columns.toSet
val total = cols1 ++ cols2 // union
val order = df1.columns ++ df2.columns
val sorted = total.toList.sortWith((a,b)=> order.indexOf(a) < order.indexOf(b))
def expr(myCols: Set[String], allCols: List[String]) = {
allCols.map( {
case x if myCols.contains(x) => col(x)
case y => lit(null).as(y)
})
}
df1.select(expr(cols1, sorted): _*).unionAll(df2.select(expr(cols2, sorted): _*))
}
# df1 and df2 are assumed to be the given dataFrames from the question
# Get the lacking columns for each dataframe and set them to null in the respective dataFrame.
# First do so for df1...
for column in [column for column in df1.columns if column not in df2.columns]:
df1 = df1.withColumn(column, lit(None))
# ... and then for df2
for column in [column for column in df2.columns if column not in df1.columns]:
df2 = df2.withColumn(column, lit(None))
import pyspark.sql.functions as F
def union_different_schemas(df1, df2):
# Get a list of all column names in both dfs
columns_df1 = df1.columns
columns_df2 = df2.columns
# Get a list of datatypes of the columns
data_types_df1 = [i.dataType for i in df1.schema.fields]
data_types_df2 = [i.dataType for i in df2.schema.fields]
# We go through all columns in df1 and if they are not in df2, we add
# them (and specify the correct datatype too)
for col, typ in zip(columns_df1, data_types_df1):
if col not in df2.columns:
df2 = df2\
.withColumn(col, F.lit(None).cast(typ))
# Now df2 has all missing columns from df1, let's do the same for df1
for col, typ in zip(columns_df2, data_types_df2):
if col not in df1.columns:
df1 = df1\
.withColumn(col, F.lit(None).cast(typ))
# Now df1 and df2 have the same columns, not necessarily in the same
# order, therefore we use unionByName
combined_df = df1\
.unionByName(df2)
return combined_df
22条答案
按热度按时间bf1o4zei16#
下面是使用pyspark的python 3.0代码:
0kjbasz617#
修改了albertobonsanto的版本以保留原始列顺序(op暗示顺序应该与原始表匹配)。此外,还有
match
部分导致intellij警告。以下是我的版本:
ejk8hzay18#
有许多简洁的方法来处理这个问题,但要适度牺牲性能。
这就是实现这个技巧的函数。对每个Dataframe使用tojson就形成了一个json联合。这将保留排序和数据类型。
唯一的问题是tojson的价格相对较高(不过,你可能不会得到10-15%的减速)。但是,这样可以保持代码的干净。
4nkexdtk19#
如果您是从文件加载,我想您可以使用read函数和文件列表。
结果Dataframe将具有合并列。
xmjla07d20#
我发现这里大多数python的答案都有点太晦涩了,如果你只是简单的回答
lit(None)
-变通方法(这也是我知道的唯一方法)。作为替代方案,这可能有用:然后再做
union()
你想做的。注意:如果列顺序与
df1
以及df2
使用unionByName()
!9vw9lbht21#
此函数接收具有不同模式的两个Dataframe(df1和df2)并将它们合并。首先,我们需要通过将df1到df2的所有(丢失的)列添加到同一个模式中,反之亦然。要向df添加新的空列,我们需要指定数据类型。
jhkqcmku22#
一个非常简单的方法-
select
Dataframe和Dataframe中顺序相同的列使用unionAll
```df1.select('code', 'date', 'A', 'B', 'C', lit(None).alias('D'), lit(None).alias('E'))
.unionAll(df2.select('code', 'date', lit(None).alias('A'), 'B', 'C', 'D', 'E'))