scala 通过迭代列值来排列Spark Dataframe 中的列

kninwzqo  于 2023-04-30  发布在  Scala
关注(0)|答案(1)|浏览(128)

不知道这个问题是否已经被问过了,但我找不到解决方案,所以我发布了这个问题。我有一个Spark Dataframe ,其输入值如下:

+----------------------------------------------+-------------------------------------+
|                        market_Year           |       Quarter-Range                 |
+----------------------------------------------+-------------------------------------+
|/equity/dividends[DIV|1Y]                     |    [0D-6M]                          |
|/equity/dividends[DIV|1Y]                     |    [0D-6M],[6M-18M]                 |
|/equity/dividends[DIV|2Y]                     |    [18M-2Y]                         |
|/equity/dividends[DIV|4Y]                     |    [18M-2Y]                         |
|/equity/dividends[DIV|4Y]                     |    [18M-2Y]                         |
|/equity/dividends[DIV|2Y]                     |    [18M-2Y]                         |
|/equity/dividends[DIV|1Y]                     |    [6M-18M]                         |
+----------------------------------------------+-------------------------------------+

我正在寻找的输出应该是:

+----------------------------------------------+-------------------------------------+
|                        market_Year           |                Quarter-Range        |
+----------------------------------------------+-------------------------------------+
|/equity/dividends[DIV|1Y]                     |  [0D-6M],[6M-18M]                   |
|/equity/dividends[DIV|2Y]                     |  [18M-2Y]                           |
|/equity/dividends[DIV|3Y]                     |  [18M-2Y]                           |
|/equity/dividends[DIV|4Y]                     |                                     |
+----------------------------------------------+-------------------------------------+

如果有人能帮助我实现这个输出而不使用UDF将是一个很大的帮助。
先谢谢你了

Dataset<Row>   DF_to = printdf.withColumn("Quarter-Range", explode(array("Quarter-Range))).groupBy(col(market_Year))
                .agg(collect_set("Quarter-Range").distinct();

我得到这个输出:

+----------------------------------------------+-------------------------------------+
|                        market_Year           |          Quarter-Range              |
+----------------------------------------------+-------------------------------------+
|/equity/dividends[DIV|1Y]                     |[[0D-6M],[6M-18M], [0D-6M], [6M-18M]]|
|/equity/dividends[DIV|2Y]                     |                           [[18M-2Y]]|
|/equity/dividends[DIV|3Y]                     |                           [[18M-2Y]]|
|/equity/dividends[DIV|4Y]                     |                                   []|
+----------------------------------------------+-------------------------------------+
rjee0c15

rjee0c151#

我不确定你的例子是否正确
您的示例解决方案几乎没有小的错别字,但是当我调整它并执行时,输出与您粘贴的输出不一样
在你的输出中,你有Y3的记录,但它不存在于输入数据中。它也不是你的例子经过调整后产生的。
如果我理解正确的话,在你的输入中你有字符串,collect_set没有返回你所期望的。我认为在你的爆炸中你应该使用分割而不是数组
以下是我在Scala中的示例(Java中的核心逻辑看起来类似):

import org.apache.spark.sql.functions._

val inputData = Seq(
  ("/equity/dividends[DIV|1Y]", "[0D-6M]"),
  ("/equity/dividends[DIV|1Y]", "[0D-6M],[6M-18M]"),
  ("/equity/dividends[DIV|2Y]", "[18M-2Y]"),
  ("/equity/dividends[DIV|4Y]", "[18M-2Y]"),
  ("/equity/dividends[DIV|4Y]", "[18M-2Y]"),
  ("/equity/dividends[DIV|2Y]", "[18M-2Y]"),
  ("/equity/dividends[DIV|1Y]", "[6M-18M]")
)

val df = inputData.toDF("market_Year", "Quarter-Range")
df.withColumn("Quarter-Range", explode(split(col("Quarter-Range"), ",")))
  .groupBy("market_Year")
  .agg(
    collect_set("Quarter-Range")
      .as("Quarter-Range")
  )
  .show(truncate = false)

输出为:

+-------------------------+-------------------+
|market_Year              |Quarter-Range      |
+-------------------------+-------------------+
|/equity/dividends[DIV|1Y]|[[0D-6M], [6M-18M]]|
|/equity/dividends[DIV|2Y]|[[18M-2Y]]         |
|/equity/dividends[DIV|4Y]|[[18M-2Y]]         |
+-------------------------+-------------------+

相关问题