如何在spark中对具有不同列数的两个Dataframe执行并集?

qacovj5a  于 2021-07-09  发布在  Spark
关注(0)|答案(22)|浏览(623)

我有两个 DataFrame 学生:

我需要这样的结合:

这个 unionAll 函数不起作用,因为列的编号和名称不同。
我该怎么做?

bf1o4zei

bf1o4zei16#

下面是使用pyspark的python 3.0代码:

from pyspark.sql.functions import lit

def __order_df_and_add_missing_cols(df, columns_order_list, df_missing_fields):
    """ return ordered dataFrame by the columns order list with null in missing columns """
    if not df_missing_fields:  # no missing fields for the df
        return df.select(columns_order_list)
    else:
        columns = []
        for colName in columns_order_list:
            if colName not in df_missing_fields:
                columns.append(colName)
            else:
                columns.append(lit(None).alias(colName))
        return df.select(columns)

def __add_missing_columns(df, missing_column_names):
    """ Add missing columns as null in the end of the columns list """
    list_missing_columns = []
    for col in missing_column_names:
        list_missing_columns.append(lit(None).alias(col))

    return df.select(df.schema.names + list_missing_columns)

def __order_and_union_d_fs(left_df, right_df, left_list_miss_cols, right_list_miss_cols):
    """ return union of data frames with ordered columns by left_df. """
    left_df_all_cols = __add_missing_columns(left_df, left_list_miss_cols)
    right_df_all_cols = __order_df_and_add_missing_cols(right_df, left_df_all_cols.schema.names,
                                                        right_list_miss_cols)
    return left_df_all_cols.union(right_df_all_cols)

def union_d_fs(left_df, right_df):
    """ Union between two dataFrames, if there is a gap of column fields,
     it will append all missing columns as nulls """
    # Check for None input
    if left_df is None:
        raise ValueError('left_df parameter should not be None')
    if right_df is None:
        raise ValueError('right_df parameter should not be None')
        # For data frames with equal columns and order- regular union
    if left_df.schema.names == right_df.schema.names:
        return left_df.union(right_df)
    else:  # Different columns
        # Save dataFrame columns name list as set
        left_df_col_list = set(left_df.schema.names)
        right_df_col_list = set(right_df.schema.names)
        # Diff columns between left_df and right_df
        right_list_miss_cols = list(left_df_col_list - right_df_col_list)
        left_list_miss_cols = list(right_df_col_list - left_df_col_list)
        return __order_and_union_d_fs(left_df, right_df, left_list_miss_cols, right_list_miss_cols)
0kjbasz6

0kjbasz617#

修改了albertobonsanto的版本以保留原始列顺序(op暗示顺序应该与原始表匹配)。此外,还有 match 部分导致intellij警告。
以下是我的版本:

def unionDifferentTables(df1: DataFrame, df2: DataFrame): DataFrame = {

  val cols1 = df1.columns.toSet
  val cols2 = df2.columns.toSet
  val total = cols1 ++ cols2 // union

  val order = df1.columns ++  df2.columns
  val sorted = total.toList.sortWith((a,b)=> order.indexOf(a) < order.indexOf(b))

  def expr(myCols: Set[String], allCols: List[String]) = {
      allCols.map( {
        case x if myCols.contains(x) => col(x)
        case y => lit(null).as(y)
      })
  }

  df1.select(expr(cols1, sorted): _*).unionAll(df2.select(expr(cols2, sorted): _*))
}
ejk8hzay

ejk8hzay18#

有许多简洁的方法来处理这个问题,但要适度牺牲性能。

def unionWithDifferentSchema(a: DataFrame, b: DataFrame): DataFrame = {
    sparkSession.read.json(a.toJSON.union(b.toJSON).rdd)
}

这就是实现这个技巧的函数。对每个Dataframe使用tojson就形成了一个json联合。这将保留排序和数据类型。
唯一的问题是tojson的价格相对较高(不过,你可能不会得到10-15%的减速)。但是,这样可以保持代码的干净。

4nkexdtk

4nkexdtk19#

如果您是从文件加载,我想您可以使用read函数和文件列表。


# file_paths is list of files with different schema

    df = spark.read.option("mergeSchema", "true").json(file_paths)

结果Dataframe将具有合并列。

xmjla07d

xmjla07d20#

我发现这里大多数python的答案都有点太晦涩了,如果你只是简单的回答 lit(None) -变通方法(这也是我知道的唯一方法)。作为替代方案,这可能有用:


# df1 and df2 are assumed to be the given dataFrames from the question

# Get the lacking columns for each dataframe and set them to null in the respective dataFrame.

# First do so for df1...

for column in [column for column in df1.columns if column not in df2.columns]:
    df1 = df1.withColumn(column, lit(None))

# ... and then for df2

for column in [column for column in df2.columns if column not in df1.columns]:
    df2 = df2.withColumn(column, lit(None))

然后再做 union() 你想做的。
注意:如果列顺序与 df1 以及 df2 使用 unionByName() !

result = df1.unionByName(df2)
9vw9lbht

9vw9lbht21#

此函数接收具有不同模式的两个Dataframe(df1和df2)并将它们合并。首先,我们需要通过将df1到df2的所有(丢失的)列添加到同一个模式中,反之亦然。要向df添加新的空列,我们需要指定数据类型。

import pyspark.sql.functions as F

def union_different_schemas(df1, df2):
   # Get a list of all column names in both dfs
   columns_df1 = df1.columns
   columns_df2 = df2.columns
   # Get a list of datatypes of the columns
   data_types_df1 = [i.dataType for i in df1.schema.fields]
   data_types_df2 = [i.dataType for i in df2.schema.fields]
   # We go through all columns in df1 and if they are not in df2, we add 
   # them (and specify the correct datatype too)
   for col, typ in zip(columns_df1, data_types_df1):
      if col not in df2.columns:
         df2 = df2\
            .withColumn(col, F.lit(None).cast(typ))
   # Now df2 has all missing columns from df1, let's do the same for df1
   for col, typ in zip(columns_df2, data_types_df2):
      if col not in df1.columns:
         df1 = df1\
            .withColumn(col, F.lit(None).cast(typ))
   # Now df1 and df2 have the same columns, not necessarily in the same 
   # order, therefore we use unionByName
   combined_df = df1\
      .unionByName(df2)

   return combined_df
jhkqcmku

jhkqcmku22#

一个非常简单的方法- select Dataframe和Dataframe中顺序相同的列使用 unionAll ```
df1.select('code', 'date', 'A', 'B', 'C', lit(None).alias('D'), lit(None).alias('E'))
.unionAll(df2.select('code', 'date', lit(None).alias('A'), 'B', 'C', 'D', 'E'))

相关问题