pyspark 函数分解器输入应为数组或Map类型,而不是结构

eni9jsuy  于 2023-01-01  发布在  Spark
关注(0)|答案(3)|浏览(228)

我有以下数据。

data = [
    [
        "2022-12-12",
        ["IND", "u1", [["auction_1", [[1,20], [2,12]]], ["auction_2", [[1,5], [2,7]]]]],
    ],
    [
        "2022-12-12",
        ["USA", "u2", [["auction_1", [[1,8], [2,12]]], ["auction_2", [[1,11], [2,4]]]]],
    ],
]

我有以下模式
x一个一个一个一个x一个一个二个一个x一个一个三个一个
我想有数据在下面的格式作进一步分析.

date, country, userId, refferalId, action, amountSpent, timeSpent
2022-12-31, IND, 123, 123213,      action1, 5,          56
display(df.select(F.explode("data")))
# cannot resolve 'explode(data)' due to data type mismatch: input to function explode should be an array or map type

任何帮助都将不胜感激。
如果不能分解任何StructType,如何实现上述数据格式?
我也看了这些问题,但没有得到太多帮助-〉Error while exploding a struct column in Spark

wwtsj6pe

wwtsj6pe1#

您必须分解数据。用户:

df.select('date', 'data.country', 'data.userId', F.explode('data.users').alias('info'))

对于这些操作,您需要如下所示的查询(在分解data.users之后):

.select('date', 'country', 'userId', 'info.refferalId', F.explode('actions').alias('actionInfo'))

但是因为你把动作定义为结构体,所以它不能被分解,如果你把它的模式改为列表,代码就能正常工作

2ic8powd

2ic8powd2#

这基本上是一项需要转换大量数据以使其成为所需形式的任务,需要结合使用pyspark.sql.functions才能得到所需的形式。
如果我们从您的df开始:

output = df.select("date", "data.country", "data.userId", explode(col("data.users")).alias("users")) \
           .select("date", "country", "userId", "users.*") \
           .withColumn("actions", explode(array(
               struct("actions.action1.*", lit("action1").alias("action")),
               struct("actions.action2.*", lit("action2").alias("action"))
               )
           )) \
           .select("date", "country", "userId", "refferalId", "actions.*")

output.printSchema()
root
 |-- date: string (nullable = true)
 |-- country: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- refferalId: string (nullable = true)
 |-- amountSpent: long (nullable = true)
 |-- timeSpent: long (nullable = true)
 |-- action: string (nullable = false)

output.show()
+----------+-------+------+----------+-----------+---------+-------+
|      date|country|userId|refferalId|amountSpent|timeSpent| action|
+----------+-------+------+----------+-----------+---------+-------+
|2022-12-12|    IND|    u1| auction_1|          1|       20|action1|
|2022-12-12|    IND|    u1| auction_1|          2|       12|action2|
|2022-12-12|    IND|    u1| auction_2|          1|        5|action1|
|2022-12-12|    IND|    u1| auction_2|          2|        7|action2|
|2022-12-12|    USA|    u2| auction_1|          1|        8|action1|
|2022-12-12|    USA|    u2| auction_1|          2|       12|action2|
|2022-12-12|    USA|    u2| auction_2|          1|       11|action1|
|2022-12-12|    USA|    u2| auction_2|          2|        4|action2|
+----------+-------+------+----------+-----------+---------+-------+

每个转换的操作:

  • 第一个select语句解包data结构并分解data.users数组
  • 其次,select语句解包users结构
  • 第三,withColumn语句稍微复杂一些,此时我们有两个结构体(action1action2),它们有相同的模式,我们在这里做的是:
  • actions列添加一个文本列action,值为action1action2
  • 使用array函数将这两个相似的列放入数组中
  • 分解数组
  • 第四,select语句用于展开我们创建的actions结构体

希望这有帮助!

c2e8gylq

c2e8gylq3#

问题是你不能分解struct,你只能分解数组或map,你需要做的第一步是分解data.users(不仅仅是数据),你可以这样做:

users = df\
    .withColumn("s", F.explode("data.users"))\
    .select("date", "data.country", "data.userId", "s.*")
users.show()
+----------+-------+------+----------+------------------+
|      date|country|userId|refferalId|           actions|
+----------+-------+------+----------+------------------+
|2022-12-12|    IND|    u1| auction_1|{{1, 20}, {2, 12}}|
|2022-12-12|    IND|    u1| auction_2|  {{1, 5}, {2, 7}}|
|2022-12-12|    USA|    u2| auction_1| {{1, 8}, {2, 12}}|
|2022-12-12|    USA|    u2| auction_2| {{1, 11}, {2, 4}}|
+----------+-------+------+----------+------------------+

从那里,你想要分解动作,但是和以前一样,你不能分解结构体。为了克服这个问题,你可以把它转换成一个结构体数组。

users\
    .withColumn("actions", F.array(
        [ F.struct(
            F.lit(f"action{i}").alias("action"),
            F.col("actions")[f"action{i}"].alias("meta")
        ) for i in [1, 2] ]
    ))\
    .withColumn("action", F.explode("actions"))\
    .select("date", "country", "userId", "refferalId", "action.action", "action.meta.*")\
    .show()
+----------+-------+------+----------+-------+-----------+---------+
|      date|country|userId|refferalId| action|amountSpent|timeSpent|
+----------+-------+------+----------+-------+-----------+---------+
|2022-12-12|    IND|    u1| auction_1|action1|          1|       20|
|2022-12-12|    IND|    u1| auction_1|action2|          2|       12|
|2022-12-12|    IND|    u1| auction_2|action1|          1|        5|
|2022-12-12|    IND|    u1| auction_2|action2|          2|        7|
|2022-12-12|    USA|    u2| auction_1|action1|          1|        8|
|2022-12-12|    USA|    u2| auction_1|action2|          2|       12|
|2022-12-12|    USA|    u2| auction_2|action1|          1|       11|
|2022-12-12|    USA|    u2| auction_2|action2|          2|        4|
+----------+-------+------+----------+-------+-----------+---------+

相关问题