Apache Spark 从Seq [String]的列创建Seq [String]值的列

ndh0cuux  于 2022-12-13  发布在  Apache
关注(0)|答案(3)|浏览(210)

输入 Dataframe 如下:

输入数据格式=

+---------------------+
|days (seq[String])   |
+---------------------+
|[sat, sun]           |
|[mon, wed]           |
|[fri ]               |
|[fri, sat]           |
|[mon, sun, sat]      |
+---------------------+

我希望从days列获取包含所有现有字符串的outputDF

输出数据格式=

+---------------------+----------------------------+
|days (seq[String])   |all days (seq[String])      |
+---------------------+----------------------------+
|[sat, sun]           |[sat, sun, mon, wed, fri]   |
|[mon, wed]           |[sat, sun, mon, wed, fri]   |
|[fri]                |[sat, sun, mon, wed, fri]   |
|[fri, sat]           |[sat, sun, mon, wed, fri]   |
|[mon, sun, sat]      |[sat, sun, mon, wed, fri]   |
+---------------------+----------------------------+

如何在Scala/Spark中做到这一点

sz81bmfz

sz81bmfz1#

假设这是我们的输入,名为dataset

+---------------+
|days           |
+---------------+
|[sat, sun]     |
|[mon, wed]     |
|[fri]          |
|[fri, sat]     |
|[mon, sun, sat]|
+---------------+

我们可以得到以下输出:

+---------------+-------------------------+
|days           |all_days                 |
+---------------+-------------------------+
|[sat, sun]     |[fri, sat, sun, mon, wed]|
|[mon, wed]     |[fri, sat, sun, mon, wed]|
|[fri]          |[fri, sat, sun, mon, wed]|
|[fri, sat]     |[fri, sat, sun, mon, wed]|
|[mon, sun, sat]|[fri, sat, sun, mon, wed]|
+---------------+-------------------------+

通过下面的代码:

// First we want to create a unique ID (if you don't have that already)
dataset = dataset.withColumn("id", lit(1))

// We want to group by the id, and collect all values into an array, then apply distinct
val collected = dataset
  .groupBy("id")
  .agg(array_distinct(flatten(collect_set("days"))).as("all_days"))

// We join our main table with the collected data
dataset = dataset
  .join(collected, Seq("id"), "left")
  .drop("id")

祝你好运!

qlfbtfca

qlfbtfca2#

您可以创建另一个包含唯一天数值的数据集,然后将其联接回初始数据集:

import spark.implicits._
val data = Seq(
  Seq("sat", "sun"),
  Seq("mon", "wed"),
  Seq("fri" ),
  Seq("fri", "sat"),
  Seq("mon", "sun", "sat")
)

val df = spark.sparkContext.parallelize(data).toDF("days")
val allDf = df.select(explode(col("days")).as("days")).agg(collect_set("days").as("all_days"))
  .withColumn("join_column", lit(1))
df.withColumn("join_column", lit(1)).join(broadcast(allDf), Seq("join_column"), "left").drop("join_column").show(false)

+---------------+-------------------------+
|days           |all_days                 |
+---------------+-------------------------+
|[sat, sun]     |[fri, sun, wed, mon, sat]|
|[mon, wed]     |[fri, sun, wed, mon, sat]|
|[fri]          |[fri, sun, wed, mon, sat]|
|[fri, sat]     |[fri, sun, wed, mon, sat]|
|[mon, sun, sat]|[fri, sun, wed, mon, sat]|
+---------------+-------------------------+
nue99wik

nue99wik3#

您可以收集非重复值,然后使用withColumn将其添加

import org.apache.spark.sql.functions._

val data = Seq((Seq("sun", "sun", "mon")),
               (Seq("sun", "tue", "mon")),
               (Seq("fri", "tue")))
val days = data.toDF("days")
val uniqueDays = days.agg(array_distinct(flatten(collect_set("days")))).head()(0)

days.withColumn("all days", lit(uniqueDays)).show(false)

输出为:

+---------------+--------------------+
|days           |all days            |
+---------------+--------------------+
|[sun, sun, mon]|[fri, tue, sun, mon]|
|[sun, tue, mon]|[fri, tue, sun, mon]|
|[fri, tue]     |[fri, tue, sun, mon]|
+---------------+--------------------+

相关问题