pyspark 将嵌套结构字段转换为Json字符串

zujrkrfu  于 2023-01-12  发布在  Spark
关注(0)|答案(2)|浏览(217)

我正在尝试使用pyspark将一些mongo集合吸收到big query中,模式如下所示。

root
 |-- groups: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- my_field: struct (nullable = true)
 |    |    |    |-- **{ mongo id }**: struct (nullable = true)
 |    |    |    |    |-- A: timestamp (nullable = true)
 |    |    |    |    |-- B: string (nullable = true)
 |    |    |    |    |-- C: struct (nullable = true)
 |    |    |    |    |    |-- abc: boolean (nullable = true)
 |    |    |    |    |    |-- def: boolean (nullable = true)
 |    |    |    |    |    |-- ghi: boolean (nullable = true)
 |    |    |    |    |    |-- xyz: boolean (nullable = true)

问题是我们在my_field中存储了id,每个组都有自己的id,当我将所有内容导入到大型查询中时,我最终会为每个id创建一个新列。我想将my_field转换为字符串,并将所有嵌套字段存储为json或类似的内容。但当我尝试转换时,我收到了此错误

temp_df = temp_df.withColumn("groups.my_field", col("groups.my_field").cast('string'))

TypeError: Column is not iterable

我错过了什么?

nqwrtyyt

nqwrtyyt1#

原来,为了追加/删除/重命名嵌套字段,您需要更改模式。我不知道这一点。下面是我的答案。我从这里https://stackoverflow.com/a/48906217/984114复制并修改了代码,以便使其能够与我的模式一起工作
下面是“exclude_nested_field”的修改版本

def change_nested_field_type(schema, fields_to_change, parent=""):
  new_schema = []

  if isinstance(schema, StringType):
      return schema

  for field in schema:
      full_field_name = field.name

      if parent:
          full_field_name = parent + "." + full_field_name

      if full_field_name not in fields_to_change:
          if isinstance(field.dataType, StructType):
              inner_schema = change_nested_field_type(field.dataType, fields_to_change, full_field_name)
              new_schema.append(StructField(field.name, inner_schema))
          elif isinstance(field.dataType, ArrayType):
              inner_schema = change_nested_field_type(field.dataType.elementType, fields_to_change, full_field_name)
              new_schema.append(StructField(field.name, ArrayType(inner_schema)))
          else:
              new_schema.append(StructField(field.name, field.dataType))
      else:
          # Here we change the field type to String
          new_schema.append(StructField(field.name, StringType()))

  return StructType(new_schema)

我是这样调用这个函数的

new_schema = ArrayType(change_nested_field_type(df.schema["groups"].dataType.elementType, ["my_field"]))
df = df.withColumn("json", to_json("groups")).drop("groups")
df = df.withColumn("groups", from_json("json", new_schema)).drop("json")
az31mfrm

az31mfrm2#

我需要一个通用的解决方案来处理任意级别的嵌套列强制转换。

from typing import Dict
from pyspark.sql.types import StructType, ArrayType, StringType, StructField, _all_atomic_types
from pyspark.sql.functions import col

def apply_nested_column_casts(
    schema: StructType, column_cast: Dict[str, str], parent: str
) -> StructType:
    new_schema = []

    if isinstance(schema, StringType):
        return schema

    for field in schema:
        full_field_name = field.name

        if parent:
            full_field_name = parent + "." + full_field_name

        if full_field_name not in column_cast:
            if isinstance(field.dataType, StructType):
                inner_schema = apply_nested_column_casts(
                    field.dataType, column_cast, full_field_name
                )
                new_schema.append(StructField(field.name, inner_schema))
            elif isinstance(field.dataType, ArrayType):
                inner_schema = apply_nested_column_casts(
                    field.dataType.elementType, column_cast, full_field_name
                )
                new_schema.append(StructField(field.name, ArrayType(inner_schema)))
            else:
                new_schema.append(StructField(field.name, field.dataType))
        else:
            # Here we change the field type to the intended type
            cast_type = _all_atomic_types[column_cast[full_field_name]]
            new_schema.append(StructField(field.name, cast_type()))

    return StructType(new_schema)

def apply_column_casts(
    df: SparkDataFrame, column_casts: Dict[str, str]
) -> SparkDataFrame:
    for col_name, cast_to in column_casts.items():
        splitted_col_name = col_name.split(".")

        if len(splitted_col_name) == 1:
            df = df.withColumn(col_name, col(col_name).cast(cast_to))
        else:
            nested_field_parent_field = splitted_col_name[0]
            nested_field_parent_type = df.schema[nested_field_parent_field].dataType
            column_cast = {col_name: cast_to}
            if isinstance(nested_field_parent_type, StructType):
                new_schema = apply_nested_column_casts(
                    nested_field_parent_type, column_cast, nested_field_parent_field
                )
            elif isinstance(nested_field_parent_type, ArrayType):
                new_schema = ArrayType(
                    apply_nested_column_casts(
                        nested_field_parent_type.elementType,
                        column_cast,
                        nested_field_parent_field,
                    )
                )

            tmp_json = f"{nested_field_parent_field}_json"

            df = df.withColumn(tmp_json, to_json(nested_field_parent_field)).drop(
                nested_field_parent_field
            )
            df = df.withColumn(
                nested_field_parent_field, from_json(tmp_json, new_schema)
            ).drop(tmp_json)
    return df

您还可以使用点标记法对嵌套列强制转换进行调用,如下所示

column_casts = {
    "col_a": "string",
    "col_b.nested_col": "double",
    "col_b.nested_struct_col.some_col": "long", 
}

df = apply_column_casts(df, column_casts)

相关问题