将python dict的pyspark pipelinerdd解压缩到pysparkDataframe

gkl3eglg  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(420)

我´我使用flatmap来解析Dataframe,它工作得很好,但是我´我无法将最终结果重塑为多列数据集。如何解析这个rdd?这是我在flatmap之后的结果的示例行:

[Row(XXXX-XXXX-XXXX-XXXXX-XXXXXX={'m_ci_id': 'XXXX-XXXX-XXXX-XXXXX-XXXXXX', 'ci_id': 'XXXX-XXXX-XXXX-XXXXX-XXXXXX', 'pp_breaker_power_phase': 'L1_L2', 'pp_breaker_poles': 2, 'pp_breaker_panel_circuit_number': 2, 'cp_ci_id': None, 'cp_value': None, 'phase': 'L1', 'pole': 2})]

我´我传递一个Dataframe,其中的列与dict中看到的列相同,这是我在flatmap中使用的函数:

def get_poles_phases(row):
    """
    :param row:
    :return:
    """
    new_rows = []
    initial_pole = row.pp_breaker_panel_circuit_number
    phases = row.pp_breaker_power_phase.split('_')

    for _ in range(row.pp_breaker_poles):
        temp = row.asDict()
        temp['phase'] = phases[_]
        temp['pole'] = initial_pole

        if row.cp_value != 'Phase Grouping':
            initial_pole += 2
        else:
            logger.error('Panel configuration not recognized.')

        new_rows.append(row(temp))

    return new_rows

我尝试使用structfields的模式,但没有成功´不起作用

cols = [StructField('m_ci_id', StringType(), True),
         StructField('ci_id', StringType(), True),
         StructField('pp_breaker_power_phase', StringType(), True),
         StructField('pp_breaker_poles', StringType(), True),
         StructField('pp_breaker_panel_circuit_number', StringType(), True),
         StructField('cp_ci_id', StringType(), True),
         StructField('cp_value', StringType(), True),
         StructField('phase', StringType(), True),
         StructField('pole', StringType(), True)]

schema = StructType(cols)
poles_phases = poles_phases.toDF(schema)

我还尝试传递列名列表。

poles_phases = poles_phases.toDF(['m_ci_id', 'ci_id', 'pp_breaker_power_phase', 'pp_breaker_poles', 'pp_breaker_panel_circuit_number', 'cp_ci_id', 'cp_value', 'phase', 'pole'])

我怀疑这不起作用,因为我得到的rdd只有一列,但我不知道如何解析出单个dict,以便模式匹配。

hsgswve4

hsgswve41#

我想出来了:

from pyspark.sql import Row
poles_phases = poles_phases.map(lambda row: Row(**list(row.asDict().values())[0]))

这是通过解包value dict来构建新行

poles_phases = poles_phases.toDF(['m_ci_id', 'ci_id', 'pp_breaker_power_phase', 'pp_breaker_poles', 'pp_breaker_panel_circuit_number', 'cp_ci_id', 'cp_value', 'phase', 'pole'])

如果你有 None 模式推理可能会失败,因此需要显式声明它,例如。,

cols = [StructField('m_ci_id', StringType(), True),
        StructField('ci_id', StringType(), True),
        StructField('pp_breaker_power_phase', StringType(), True),
        StructField('pp_breaker_poles', StringType(), True),
        StructField('pp_breaker_panel_circuit_number', StringType(), True),
        StructField('cp_ci_id', StringType(), True),
        StructField('cp_value', StringType(), True),
        StructField('phase', StringType(), True),
        StructField('pole', StringType(), True)]

schema = StructType(cols)
poles_phases = poles_phases.toDF(schema)

相关问题