python GroupBy Spark Dataframe并将聚合数据作为字符串进行操作

oyjwcjzk  于 2023-08-02  发布在  Python
关注(0)|答案(2)|浏览(92)

AWS Glue Spark作业正在进行转换。在下面的示例中,我按“item_guid”和“item_name”对行进行分组,并将“option”列聚合到一个集合集中。集合集是一个数组,但是稍后我需要将其Map到Postgres数据库,并且我需要将该数组转换为字符串。因此,在本发明中,

array_to_string_df = grouped_df.withColumn("option", concat_ws(',', col("option")))

字符串
将把选项转换成逗号分隔的字符串。然而,对于Postgres,其中选项的列具有类型text[],字符串必须包含在花括号中,并且应该如下所示:{90000,86000}
问题是:如何在最后一步转换中将选项值转换为“{90000,86000,81000}”封闭字符串?这似乎是一个简单的技巧,但我无法想出一个优雅的解决方案来解决它。
代码示例:

from pyspark.sql.functions import collect_list, collect_set, concat_ws, col, lit

simpleData = [("001","1122","YPIA_PROD",90000),
    ("002","1122","YPIA_PROD",86000),
    ("003","1122","YPIA_PROD",81000),
    ("004","1122","YPIA_ABC",90000),
    ("005","1133","YPIA_PROD",99000),
    ("006","1133","YPIA_PROD",83000),
    ("007","1144","YPIA_PROD",79000),
    ("008","1144","YPIA_PROD",80000),
    ("009","1144","YPIA_ABC",91000)
]

rrd = spark.sparkContext.parallelize(simpleData)
df = rrd.toDF(["id","item_guid","item_name","option"])
df.show()

grouped_df = df.groupby("item_guid", "item_name").agg(collect_set("option").alias("option"))

array_to_string_df = grouped_df.withColumn("option", concat_ws(',', col("option")))
grouped_df.show()
array_to_string_df.show()


DF显示输出:

+---+----------+---------+------+
| id| item_guid|item_name|option|
+---+----------+---------+------+
|001|      1122|YPIA_PROD| 90000|
|002|      1122|YPIA_PROD| 86000|
|003|      1122|YPIA_PROD| 81000|
|004|      1122| YPIA_ABC| 90000|
|005|      1133|YPIA_PROD| 99000|
|006|      1133|YPIA_PROD| 83000|
|007|      1144|YPIA_PROD| 79000|
|008|      1144|YPIA_PROD| 80000|
|009|      1144| YPIA_ABC| 91000|
+---+----------+---------+------+

+----------+---------+--------------------+
| item_guid|item_name|              option|
+----------+---------+--------------------+
|      1133|YPIA_PROD|      [83000, 99000]|
|      1122|YPIA_PROD|[90000, 86000, 81...|
|      1122| YPIA_ABC|             [90000]|
|      1144|YPIA_PROD|      [79000, 80000]|
|      1144| YPIA_ABC|             [91000]|
+----------+---------+--------------------+

+----------+---------+-----------------+
|item_guid |item_name|           option|
+----------+---------+-----------------+
|      1133|YPIA_PROD|      83000,99000|
|      1122|YPIA_PROD|90000,86000,81000|
|      1122| YPIA_ABC|            90000|
|      1144|YPIA_PROD|      79000,80000|
|      1144| YPIA_ABC|            91000|
+----------+---------+-----------------+

dldeef67

dldeef671#

from pyspark.sql.functions import collect_list, collect_set,concat, concat_ws, 
col, lit 

simpleData = [("001","1122","YPIA_PROD",90000),
    ("002","1122","YPIA_PROD",86000),
    ("003","1122","YPIA_PROD",81000),
    ("004","1122","YPIA_ABC",90000),
    ("005","1133","YPIA_PROD",99000),
    ("006","1133","YPIA_PROD",83000),
    ("007","1144","YPIA_PROD",79000),
    ("008","1144","YPIA_PROD",80000),
    ("009","1144","YPIA_ABC",91000)
]

schema = ["id","item_guid","item_name","option"]
df = spark.createDataFrame(data=simpleData, schema = schema)
#df.printSchema()
df.show(truncate=False)
grouped_df = df.groupby("item_guid", 
"item_name").agg(collect_set("option").alias("option"))
array_to_string_df = grouped_df.withColumn("option", concat_ws(',', col("option")) ).select(col("item_guid"), col("item_name"), concat(lit("{"), col("option"), lit("}")).alias("option"))   

array_to_string_df.show()

字符串

8tntrjer

8tntrjer2#

concat('{', substring( aggregate(  option, '', (acc, cur) -> concat_ws(',',acc, cur) ) from 2),'}')

字符串
使用Functions.expr()或等效的Column API用法。

相关问题