pyspark 如何在不知道指令内容的情况下将包含指令数组的列转换为列?

xxe27gdn  于 2023-03-17  发布在  Spark
关注(0)|答案(1)|浏览(101)

我有一个df,其中2列包含一个dicts列表,我试图将其分解为列,但没有成功。
这是我的模式:

|-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |    |    |-- origin: string (nullable = true)
 |-- createdAt: long (nullable = true)
 |-- modules: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- detected: struct (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- id: integer (nullable = true)
 |    |    |    |-- reason: string (nullable = true)
 |    |    |    |-- state: string (nullable = true)
 |    |    |    |-- score: integer (nullable = true)
 |    |    |    |-- level: string (nullable = true)
 |-- id: string (nullable = true)

它看起来是这样的:

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|data                                                                  |createdAt    |modules                                                                                                                                    |id                                  |
+----------------------------------------------------------------------+-------------+-------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------+
|[{data_point_1, false, METADATA}, {data_point_2, some_string, DEVICE}]|1678148428468|[{ANOTHER_DUMMY, {null, null, null, null, null, null}}, {DUMMY, {dummy_user_agent, 1, Rule for integration tests, OPERATIONAL, 500, HIGH}}]|70ef58bf-b160-4abd-97c1-aa4780e74e1b|
|[{data_point_1, false, METADATA}, {data_point_3, 0, USER}]            |1678148428495|[{ANOTHER_DUMMY, {null, null, null, null, null, null}}, {DUMMY, {null, null, null, null, null, null}}]                                     |6ab33e95-dd94-4c95-b00f-edfe97d6f3d1|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

关于数据本身,在data列中,名称值可以像示例中那样逐行不同,并且modules列结构应该看起来相同,但它具有另一个dict detected

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|data_point_1|data_point_1_origin|data_point_2|data_point_2_origin|data_point_3|data_point_3_origin|createdAt    |ANOTHER_DUMMY_name|ANOTHER_DUMMY_id|ANOTHER_DUMMY_reason|ANOTHER_DUMMY_state|ANOTHER_DUMMY_score|ANOTHER_DUMMY_level|DUMMY_name      |DUMMY_id|DUMMY_reason              |DUMMY_state|DUMMY_score|DUMMY_level|id                                  |
+------------+-------------------+------------+-------------------+------------+-------------------+-------------+------------------+----------------+--------------------+-------------------+-------------------+-------------------+----------------+--------+--------------------------+-----------+-----------+-----------+------------------------------------+
|false       |METADATA           |some_string |DEVICE             |null        |null               |1678148428468|null              |null            |null                |null               |null               |null               |dummy_user_agent|1       |Rule for integration tests|OPERATIONAL|500        |HIGH       |70ef58bf-b160-4abd-97c1-aa4780e74e1b|
|false       |METADATA           |null        |null               |0           |USER               |1678148428495|null              |null            |null                |null               |null               |null               |null            |null    |null                      |null       |null       |null       |6ab33e95-dd94-4c95-b00f-edfe97d6f3d1|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

当我不知道data数组中的dict中的名称时,以及当我需要分解像modules这样的嵌套dict时,是否有办法执行类似的操作?

xdyibdwo

xdyibdwo1#

这是我的解决方案:
我把数组列和uuid保存到一个新的df中,然后把数组列分解成行,所以我现在只有map,而不是原来的map数组,从map创建列非常简单(大多数有趣的代码都在spark utils下):

class SparkUtils:
    def __init__(self, spark: SparkSession):
        self.spark = spark

    def format_json_column(self, df: DataFrame, column: str, schema: DataType = None) -> DataFrame:
        if df.schema[column].dataType.typeName() == "string":
            json_schema = schema or self.spark.read.json(df.rdd.map(lambda row: row[column])).schema
            formatted_df = df.withColumn(column, from_json(df[column], json_schema))
            return formatted_df
        return df

    @staticmethod
    def horizontal_concat(df1: DataFrame, df2: DataFrame, df_id: str):
        return df1.join(df2, df_id, "outer")

    @staticmethod
    def add_prefix_and_suffix_to_columns(df: DataFrame, suffix: str = '', prefix: str = '',
                                         to_ignore: list = None) -> DataFrame:
        to_ignore = to_ignore if to_ignore is not None else []
        return df.select([col(column).alias(f'{prefix}{column}{suffix}') if column not in to_ignore else column
                          for column in df.columns])

    @staticmethod
    def rename_columns(df: DataFrame, columns_mapper: dict) -> DataFrame:
        return df.select([col(column).alias(columns_mapper.get(column)) if column in columns_mapper else column
                          for column in df.columns])

    def explode_dict_column(self, df: DataFrame, column: str, df_id: str, schema: DataType = None,
                            prefix: str = None) -> DataFrame:
        formatted_column_df = self.format_json_column(df, column, schema).select([f'{column}.*', df_id])
        if prefix:
            formatted_column_df = self.add_prefix_and_suffix_to_columns(formatted_column_df, prefix=prefix,
                                                                        to_ignore=[df_id])
        return self.horizontal_concat(df, formatted_column_df, df_id).drop(column)

    def explode_array_column(self, df: DataFrame, column: str, df_id: str, pivot_by: str,
                             schema: DataType = None) -> DataFrame:
        formatted_column_df = self.format_json_column(df, column, schema)
        exploded_column_df = formatted_column_df.withColumn(column, explode(column)).select([f'{column}.*', df_id])
        aggregation_by = {column: "max" for column in exploded_column_df.columns if column not in [df_id, pivot_by]}
        return exploded_column_df.groupBy(df_id).pivot(pivot_by).agg(aggregation_by)
class Job:
    def __init__(self, spark: SparkSession, df: DataFrame, topic: str):
        self.spark: SparkSession = spark_application.get_spark_session()
        self.df = df
        self.topic_name = str
        self.spark_utils = SparkUtils(self.spark)
        self.job_schemas = JOB_SCHEMAS

    def _explode_features_column(self):
        self.df = self.spark_utils.explode_dict_column(self.df, FEATURES_COLUMN, 'id')

    @staticmethod
    def _get_fixed_data_column_names(columns: List[str]) -> list:
        return [column.replace(ORIGIN_SUFFIX_AFTER_EXPLODE, PROPER_ORIGIN_SUFFIX)
                if column.endswith(ORIGIN_SUFFIX_AFTER_EXPLODE) else column.replace(VALUE_SUFFIX_AFTER_EXPLODE, '')
                for column in columns]

    def _explode_data_column(self):
        exploded_data = self.spark_utils.explode_array_column(self.df, DATA_COLUMN, 'id', PIVOT_DATA_BY,
                                                              self.job_schemas.get(DATA_COLUMN))
        exploded_data = exploded_data.drop('id_max(origin)', 'id_max(value)')
        fixed_data_column_names = self._get_fixed_data_column_names(exploded_data.columns)
        exploded_data = exploded_data.toDF(*fixed_data_column_names)
        self.df = self.spark_utils.horizontal_concat(self.df, exploded_data, 'id').drop(DATA_COLUMN)

    def _get_exploded_module_column(self, exploded_modules: DataFrame, modules_id) -> DataFrame:
        modules = [column for column in exploded_modules.columns if column != modules_id]
        for module in modules:
            prefix = f'{module}_'
            exploded_modules = self.spark_utils.explode_dict_column(exploded_modules, module, modules_id, prefix=prefix)
        return exploded_modules

    def _explode_modules_column(self):
        self.df = self.spark_utils.rename_columns(self.df, RENAME_ID_MAPPER)
        modules_id = RENAME_ID_MAPPER.get('id')
        exploded_modules = self.spark_utils.explode_array_column(self.df, MODULES_COLUMN, modules_id,
                                                                 PIVOT_MODULES_BY, self.job_schemas.get(MODULES_COLUMN))
        exploded_modules = self._get_exploded_module_column(exploded_modules, modules_id)
        self.df = self.spark_utils.horizontal_concat(self.df, exploded_modules, modules_id).drop(MODULES_COLUMN)
        self.df = self.spark_utils.rename_columns(self.df, REVERT_RENAME_MAPPER)

    def _fix_df(self):
        for fix_function in self.fix_df_function:
            fix_function()

相关问题