sparksql-按特定列分组后如何连接字符串行

r3i60tvu  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(678)

我正在用java编写一个spark应用程序。我遇到了一个问题,在按特定列对行进行分组之后,必须从不同的行中连接字符串。感谢您的帮助!!谢谢。
输入数据集

预期输出数据集

6vl6ewon

6vl6ewon1#

使用 collect_list 当你分组然后使用 concat_ws 函数从列表中生成字符串。

df.show(false)
+--------------------------------------+------+---------------+---------------+----------------+-------+
|Errors                                |userid|associationtype|associationrank|associationvalue|sparkId|
+--------------------------------------+------+---------------+---------------+----------------+-------+
|Primary Key Constraint Violated       |3     |Brand5         |error          |Lee             |4      |
|Incorrect datatype in  associationrank|3     |Brand5         |error          |Lee             |4      |
+--------------------------------------+------+---------------+---------------+----------------+-------+

df.groupBy("userid", "associationtype", "associationrank", "associationvalue", "sparkId")
  .agg(collect_list("Errors").as("Errors"))
  .withColumn("Errors", concat_ws(", ", col("Errors")))
  .show(false)

+------+---------------+---------------+----------------+-------+-----------------------------------------------------------------------+
|userid|associationtype|associationrank|associationvalue|sparkId|Errors                                                                 |
+------+---------------+---------------+----------------+-------+-----------------------------------------------------------------------+
|3     |Brand5         |error          |Lee             |4      |Primary Key Constraint Violated, Incorrect datatype in  associationrank|
+------+---------------+---------------+----------------+-------+-----------------------------------------------------------------------+
hvvq6cgz

hvvq6cgz2#

检查以下代码。

scala> sdf
.groupBy("sparkid")
.agg(collect_set($"errors").as("error_list"),first(struct($"*")).as("data"))
.select($"data.*",concat_ws(",",$"error_list").as("errors_new"))
.show(false)
+-------------------------------------+------+---------------+---------------+----------------+-------+---------------------------------------------------------------------+
|errors                               |userid|associationtype|associationrank|associationvalue|sparkid|errors_new                                                           |
+-------------------------------------+------+---------------+---------------+----------------+-------+---------------------------------------------------------------------+
|Incorrect datatype in associationrank|8     |brand3         |dd             |LeeNew          |7      |Incorrect datatype in associationrank                                |
|Incorrect datatype in associationrank|4     |brand4         |null           |Lee             |3      |Incorrect datatype in associationrank                                |
|Incorrect datatype in associationrank|1     |brand1         |iuy            |Lee             |0      |Incorrect datatype in associationrank                                |
|Primary Key Constraint Violated      |2     |brand1         |something      |Lee             |5      |Primary Key Constraint Violated,Incorrect datatype in associationrank|
|Primary Key Constraint Violated      |2     |brand2         |22             |Lee             |1      |Primary Key Constraint Violated                                      |
|Primary Key Constraint Violated      |3     |brand5         |error          |Lee             |4      |Primary Key Constraint Violated,Incorrect datatype in associationrank|
|Primary Key Constraint Violated      |3     |brand3         |40             |LeeNew          |2      |Primary Key Constraint Violated                                      |
+-------------------------------------+------+---------------+---------------+----------------+-------+---------------------------------------------------------------------+

相关问题