scala—将tuple2的值部分(即Map)组合成单个Map,并按tuple2的键进行分组

lvjbypge  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(703)

我在scala和spark做这个。
我有和 DatasetTuple2 作为 Dataset[(String, Map[String, String])] .
下面是 Dataset .

(A, {1->100, 2->200, 3->100})
(B, {1->400, 4->300, 5->900})
(C, {6->100, 4->200, 5->100})
(B, {1->500, 9->300, 11->900})
(C, {7->100, 8->200, 5->800})

如果注意到,元组的键或第一个元素可以重复。同样,相同元组键的对应Map在Map中可以有重复的键(tuple2的第二部分)。
我想创造一个最终的 Dataset[(String, Map[String, String])] . 输出应如下所示(来自上面的示例)。此外,Map的最后一个键的值将被保留(选中b和c),并且之前针对b和c的相同键将被删除。

(A, {1->100, 2->200, 3->100})
(B, {4->300, 1->500, 9->300, 11->900, 5->900})
(C, {6->100, 4->200, 7->100, 8->200, 5->800})

如果需要澄清,请告诉我。

avkwfej4

avkwfej41#

使用Dataframe:

val df = Seq(("A", Map(1 -> 100, 2 -> 200, 3 -> 100)),
    ("B", Map(1 -> 400, 4 -> 300, 5 -> 900)),
    ("C", Map(6 -> 100, 4 -> 200, 5 -> 100)),
    ("B", Map(1 -> 500, 9 -> 300, 11 -> 900)),
    ("C", Map(7 -> 100, 8 -> 200, 5 -> 800))).toDF("a", "b")

val df2 = df.select('a, explode('b))
    .groupBy("a", "key")           //remove the duplicate keys
    .agg(last('value).as("value")) //and take the last value for duplicate keys
    .groupBy("a")
    .agg(map_from_arrays(collect_list('key), collect_list('value)).as("b"))
df2.show()

印刷品

+---+---------------------------------------------------+
|a  |b                                                  |
+---+---------------------------------------------------+
|B  |[5 -> 900, 9 -> 300, 1 -> 500, 4 -> 300, 11 -> 900]|
|C  |[6 -> 100, 8 -> 200, 7 -> 100, 4 -> 200, 5 -> 800] |
|A  |[3 -> 100, 1 -> 100, 2 -> 200]                     |
+---+---------------------------------------------------+

由于涉及到两个聚合,基于rdd的答案可能会更快

lxkprmvk

lxkprmvk2#

通过使用rdd,

val rdd = sc.parallelize(
    Seq(("A", Map(1->100, 2->200, 3->100)),
        ("B", Map(1->400, 4->300, 5->900)),
        ("C", Map(6->100, 4->200, 5->100)),
        ("B", Map(1->500, 9->300, 11->900)),
        ("C", Map(7->100, 8->200, 5->800)))
)

rdd.reduceByKey((a, b) => a ++ b).collect()

// Array((A,Map(1 -> 100, 2 -> 200, 3 -> 100)), (B,Map(5 -> 900, 1 -> 500, 9 -> 300, 11 -> 900, 4 -> 300)), (C,Map(5 -> 800, 6 -> 100, 7 -> 100, 8 -> 200, 4 -> 200)))

使用Dataframe,

val df = spark.createDataFrame(
    Seq(("A", Map(1->100, 2->200, 3->100)),
        ("B", Map(1->400, 4->300, 5->900)),
        ("C", Map(6->100, 4->200, 5->100)),
        ("B", Map(1->500, 9->300, 11->900)),
        ("C", Map(7->100, 8->200, 5->800)))
).toDF("key", "map")

spark.conf.set("spark.sql.mapKeyDedupPolicy","LAST_WIN")

df.withColumn("map", map_entries($"map"))
  .groupBy("key").agg(collect_list($"map").alias("map"))
  .withColumn("map", flatten($"map"))
  .withColumn("map", map_from_entries($"map")).show(false)

+---+---------------------------------------------------+
|key|map                                                |
+---+---------------------------------------------------+
|B  |[1 -> 500, 4 -> 300, 5 -> 900, 9 -> 300, 11 -> 900]|
|C  |[6 -> 100, 4 -> 200, 5 -> 800, 7 -> 100, 8 -> 200] |
|A  |[1 -> 100, 2 -> 200, 3 -> 100]                     |
+---+---------------------------------------------------+

相关问题