python 按列分组,然后在pyspark中按元素对数组列求和

fhg3lkii  于 2023-04-10  发布在  Python
关注(0)|答案(1)|浏览(169)

你好,我有一个pyspark dataframe的形式:

CATEGORY    VALUE
0   A           [4, 5, 6]
1   A           [1, 2, 3]
2   B           [7, 8, 9]

我希望我的输出是

CATEGORY    VALUE
0   A           [5, 7, 9]
1   B           [7, 8, 9]

实际的dataframe是~2billion记录,每个数组是~1500个元素,所以这需要尽可能高效,我已经尝试将数组扩展到列,然后groupby在我的示例中工作良好,但我需要一个更有效的解决方案来实现完整的dataframe。
谢谢!

6g8kf2rb

6g8kf2rb1#

要实现所需的输出,最好的办法是使用UDF,它将以行方式工作。

数据准备

s = StringIO("""
category|value
A|4, 5, 6
A|1, 2, 3
B|7, 8, 9
"""
)

df = pd.read_csv(s,delimiter='|')

sparkDF = sql.createDataFrame(df)\
            .withColumn('value',F.split(F.col('value'),',').cast(ArrayType(IntegerType())))

sparkDF.show()

+--------+---------+
|category|    value|
+--------+---------+
|       A|[4, 5, 6]|
|       A|[1, 2, 3]|
|       B|[7, 8, 9]|
+--------+---------+

root
 |-- category: string (nullable = true)
 |-- value: array (nullable = true)
 |    |-- element: integer (containsNull = true)

全要素聚合-自定义项

@F.udf(returnType=ArrayType(IntegerType()))
def custom_arr(inp):
    
    res = np.array(inp)

    return res.sum(axis=0).tolist()

sparkDF = sparkDF.groupBy('category')\
                 .agg(F.collect_list('value').alias('value'))\
                 .withColumn('final_arr',custom_arr(F.col('value')))

sparkDF.show(truncate=False)

+--------+----------------------+---------+
|category|value                 |final_arr|
+--------+----------------------+---------+
|A       |[[4, 5, 6], [1, 2, 3]]|[5, 7, 9]|
|B       |[[7, 8, 9]]           |[7, 8, 9]|
+--------+----------------------+---------+

相关问题