pyspark 扁平嵌套Spark Dataframe

5q4ezhmt  于 2023-03-17  发布在  Spark
关注(0)|答案(6)|浏览(197)

有没有一种方法可以扁平化一个任意嵌套的Spark Dataframe?我看到的大多数工作都是为特定的模式编写的,我希望能够通用地扁平化一个具有不同嵌套类型的Dataframe(例如StructType,ArrayType,MapType等)。
假设我有一个模式:

StructType(List(StructField(field1,...), StructField(field2,...), ArrayType(StructType(List(StructField(nested_field1,...), StructField(nested_field2,...)),nested_array,...)))

希望将其调整为具有如下结构的平面表:

field1
field2
nested_array.nested_field1
nested_array.nested_field2

顺便说一句,寻找Pyspark的建议,但Spark的其他口味也很受欢迎。

uxh89sit

uxh89sit1#

这个问题可能有点老了,但是对于那些仍然在寻找解决方案的人来说,可以使用select * 内联地扁平化复杂的数据类型:
首先让我们创建嵌套 Dataframe :

from pyspark.sql import HiveContext
hc = HiveContext(sc)
nested_df = hc.read.json(sc.parallelize(["""
{
  "field1": 1, 
  "field2": 2, 
  "nested_array":{
     "nested_field1": 3,
     "nested_field2": 4
  }
}
"""]))

现在把它压平:

flat_df = nested_df.select("field1", "field2", "nested_array.*")

您可以在这里找到有用的示例:https://docs.databricks.com/delta/data-transformation/complex-types.html
如果嵌套数组太多,可以用途:

flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
flat_df = nested_df.select(*flat_cols, *[c + ".*" for c in nested_cols])
ghg1uchk

ghg1uchk2#

这将展平同时具有structtype和****arraytype的嵌套df。通常在通过Json阅读数据时很有用。在此https://stackoverflow.com/a/56533459/7131019上进行了改进

from pyspark.sql.types import *
from pyspark.sql import functions as f

def flatten_structs(nested_df):
    stack = [((), nested_df)]
    columns = []

    while len(stack) > 0:
        
        parents, df = stack.pop()
        
        array_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:5] == "array"
        ]
        
        flat_cols = [
            f.col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes
            if c[1][:6] != "struct"
        ]

        nested_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:6] == "struct"
        ]
        
        columns.extend(flat_cols)

        for nested_col in nested_cols:
            projected_df = df.select(nested_col + ".*")
            stack.append((parents + (nested_col,), projected_df))
        
    return nested_df.select(columns)

def flatten_array_struct_df(df):
    
    array_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:5] == "array"
        ]
    
    while len(array_cols) > 0:
        
        for array_col in array_cols:
            
            cols_to_select = [x for x in df.columns if x != array_col ]
            
            df = df.withColumn(array_col, f.explode(f.col(array_col)))
            
        df = flatten_structs(df)
        
        array_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:5] == "array"
        ]
    return df

flat_df = flatten_array_struct_df(df)

**

o3imoua4

o3imoua43#

这是我的最后一个方法:
1)将 Dataframe 中的行Map到dict的rdd。在线查找合适的python代码来扁平化dict。

flat_rdd = nested_df.map(lambda x : flatten(x))

其中

def flatten(x):
  x_dict = x.asDict()
  ...some flattening code...
  return x_dict

2)将RDD[dict]转换回 Dataframe

flat_df = sqlContext.createDataFrame(flat_rdd)
sgtfey8w

sgtfey8w4#

我开发了一种递归方法来扁平化任何嵌套的DataFrame。
在GitHub平台上实现了AWS数据管理器。
Spark支持在包中已经过时了,但是代码库仍然有用。

sycxhyv7

sycxhyv75#

我是这样写的:

def to_flatten(df):
   for type in df.schema:
       if type.needConversion():
           df = df.withColumn(f"{type.name}.<<your_inner_column>>", 
                             df[f"{type.name}.<<your_inner_column>>"])
   return df

解决方案不会删除现有列。
嵌套列的类型为StructType,对于StructType,needConversion()方法返回True。
(注意,对于其他一些类型,needConversion()返回True,但它们不是我的 Dataframe 的一部分)
对我来说,它产生了与使用堆栈的解决方案类似的解决方案:https://stackoverflow.com/a/65256632/21404451 .

mi7gmzs6

mi7gmzs66#

下面的要点将使嵌套JSON的结构扁平化,

import typing as T

import cytoolz.curried as tz
import pyspark

def schema_to_columns(schema: pyspark.sql.types.StructType) -> T.List[T.List[str]]:
    """
    Produce a flat list of column specs from a possibly nested DataFrame schema
    """

    columns = list()

    def helper(schm: pyspark.sql.types.StructType, prefix: list = None):

        if prefix is None:
            prefix = list()

        for item in schm.fields:
            if isinstance(item.dataType, pyspark.sql.types.StructType):
                helper(item.dataType, prefix + [item.name])
            else:
                columns.append(prefix + [item.name])

    helper(schema)

    return columns

def flatten_frame(frame: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:

    aliased_columns = list()

    for col_spec in schema_to_columns(frame.schema):
        c = tz.get_in(col_spec, frame)
        if len(col_spec) == 1:
            aliased_columns.append(c)
        else:
            aliased_columns.append(c.alias(':'.join(col_spec)))

    return frame.select(aliased_columns)

然后可以将嵌套数据展开为
flatten_data = flatten_frame(nested_df)
这将给予你一个扁平的 Dataframe 。
要点取自https://gist.github.com/DGrady/b7e7ff3a80d7ee16b168eb84603f5599

相关问题