Spark(Scala)将重复列表转换为(list_entry,count)的map

2ic8powd  于 12个月前  发布在  Scala
关注(0)|答案(1)|浏览(91)

因此,我当前的代码看起来沿着

val tokens =
    .read.option("wholetext", true).textFile(path)
    .flatMap(line => line.split(" ")
    .withColumn("filename", substring_index(input_file_name, "/", -1))
    .groupBy(col("value"))
    .agg(collect_list("filename") as "docs")

它读取path中的所有文本文档,并创建一个类似于以下内容的Dataframe:

|word1   |[1.txt]                            |
|word2   |[2.txt]                            |
|word3   |[2.txt, 1.txt, 1.txt]              |

现在我想把这些列表简化成这样(对不起,我实际上不知道Scala中的map是什么样子的)

|word1   |[(1.txt, 1)]                       |
|word2   |[(2.txt, 1)]                       |
|word3   |[(1.txt, 2), (2.txt, 1)]           |

理论上我知道该怎么做:取列表条目,将它们Map到(entry,1),然后通过将计数相加来减少它们。但我对Scala的经验很少,所以我不知道如何将其写入代码。
如上所述,我想让文档名称成为Map中的键,以使访问计数更容易。

but5z9lq

but5z9lq1#

这应该可以了,得到一个Array,你可以转换成Map。这里你得到了一个字符串数组,你需要根据逗号分成2个字段,并在Spark Scala Dataframe convert a column of Array of Struct to a column of Map上寻找指导,我投了赞成票。您也可以使用struct代替Array

import org.apache.spark.sql.functions._
val df = spark.read.textFile("/FileStore/tables/fff1.txt", "/FileStore/tables/fff2.txt")
val df2 = df.flatMap(_.split(" ")).withColumn("filename", input_file_name).groupBy("Value", "filename").count()
val df3 = df2.groupBy("Value").agg(collect_list(array("filename", "count")) as "docs")
df3.show(false)

返回:

+-----+----------------------------------------------------------------------------+
|Value|docs                                                                        |
+-----+----------------------------------------------------------------------------+
|you  |[[dbfs:/FileStore/tables/fff2.txt, 2], [dbfs:/FileStore/tables/fff1.txt, 1]]|
|fine |[[dbfs:/FileStore/tables/fff2.txt, 1], [dbfs:/FileStore/tables/fff1.txt, 1]]|
|how  |[[dbfs:/FileStore/tables/fff2.txt, 1], [dbfs:/FileStore/tables/fff1.txt, 1]]|
|hear |[[dbfs:/FileStore/tables/fff2.txt, 1], [dbfs:/FileStore/tables/fff1.txt, 1]]|
|ok   |[[dbfs:/FileStore/tables/fff2.txt, 1]]                                      |
|have |[[dbfs:/FileStore/tables/fff2.txt, 1]]                                      |
...

为了完整性,要获得Map:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val arrayToMap = udf[Map[String, Long], Seq[Row]] {
  array => array.map { case Row(key: String, value: Long) => (key, value) }.toMap
}

val df = spark.read.textFile("/FileStore/tables/fff1.txt", "/FileStore/tables/fff2.txt")
val df2 = df.flatMap(_.split(" ")).withColumn("filename", input_file_name).groupBy("Value", "filename").count()
val df3 = df2.groupBy("Value").agg(collect_list(struct("filename", "count")))   

val df4 = df3.withColumn("words", arrayToMap($"collect_list(struct(filename, count))"))
df4.show(false)

相关问题