scala—在sparkDataframe中组合链接在一起的行

zlhcx6iw  于 2021-05-24  发布在  Spark
关注(0)|答案(1)|浏览(347)

我有一个Dataframe,它的行通过各种合并相互连接。到目前为止,我已经把df转换成下面的格式,在这里我做了一个groupby“merge\u to”,并将这些数据收集到一个数组中,然后将其连接回我原来的df。看起来是这样的:

df1
+---+--------+---------+
|Ref|Merge_To|   Merges|
+---+--------+---------+
|  1|      \N|[3, 2, 3]|
|  2|       1|[5, 4, 6]|
|  5|       2|   [8, 7]|
| 10|      \N|   [9, 9]|
| 12|      \N|     [13]|
| 14|      \N|     [15]|
| 16|      18|     [17]|
| 17|      16|     [19]|
| 18|      \N|     [16]|
| 19|      17|     [20]|
+---+--------+---------+

对于参考文献1、2、5和18、16、17、19、20,它们通过一条链合并在一起。我以前做过的groupby没有捕捉到。最终,我希望我的df看起来像这样,它解释了合并链:

+---+--------+------------------------+
|Ref|Merge_To|                  Merges|
+---+--------+------------------------+
|  1|      \N|[3, 2, 3, 5, 4, 6, 8, 7]|
| 10|      \N|                  [9, 9]|
| 12|      \N|                    [13]|
| 14|      \N|                    [15]|
| 18|      \N|        [16, 17, 19, 20]|
+---+--------+------------------------+

当“merge\u to”不可用时,我尝试将df1连接到已筛选的自身\n

val arrayCombineUDF = udf((a:Seq[String], b:Seq[String]) => a ++ b )

val df1Filter = df1.filter($"Merge_To" !== "\\N").
select("Merge_To", "Merges").withColumnRenamed("Merge_To", "Chain_Ref").
withColumnRenamed("Merges", "Chain_Merges")

val df2 = df1.join(df1Filter, $"Ref" === $"Chain_Ref", "left").
withColumn("Merges", when($"Chain_Merges".isNotNull, arrayCombineUDF($"Merges", $"Chain_Merges")).
otherwise($"Merges")).
select("Ref", "Merge_To", "Merges")

df2
+---+--------+----------+------------------+
|Ref|Merge_To|Merge_From|            Merges|
+---+--------+----------+------------------+
|  1|      \N|         3|[3, 2, 3, 5, 4, 6]|
|  2|       1|        \N|   [5, 4, 6, 8, 7]|
|  5|       2|        \N|            [8, 7]|
| 10|      \N|         9|            [9, 9]|
| 12|      \N|        13|              [13]|
| 14|      \N|        \N|              [15]|
| 16|      18|        \N|          [17, 19]|
| 17|      16|        \N|          [19, 20]|
| 18|      \N|        \N|          [16, 17]|
| 19|      17|        \N|              [20]|
+---+--------+----------+------------------+

这种类型的结果,我正在寻找,但实际上只占一个层次的合并链。
我还尝试过将上述相同的连接过程放入while循环中,以使其重复连接。我还尝试将udfs与if语句结合使用,希望能够将每一行分类为merge的类型,并使用它组合成一个链。
注意:我知道数组不清晰,但我不介意,我可以在最后排序。
编辑这是原始数据框

+---+--------+----------+
|Ref|Merge_To|Merge_From|
+---+--------+----------+
|  1|      \N|         3|
|  2|       1|        \N|
|  3|       1|        \N|
|  4|       2|        \N|
|  5|       2|        \N|
|  6|       2|        \N|
|  7|       5|        \N|
|  8|       5|        \N|
|  9|      10|        \N|
| 10|      \N|         9|
| 11|      \N|        \N|
| 12|      \N|        13|
| 13|      \N|        \N|
| 14|      \N|        \N|
| 15|      14|        \N|
| 16|      18|        \N|
| 17|      16|        \N|
| 18|      \N|        \N|
| 19|      17|        \N|
| 20|      19|        \N|
+---+--------+----------+
5kgi1eie

5kgi1eie1#

11号条目似乎已经越狱了。不管怎样。
从您的基础数据来看,这是一个分层查询,可以在具有良好功能的传统RDBMS中解决,例如在大多数RDBMS中使用connect by子句或recursive with view。
你所做的尝试停留在了1级,这就是问题的症结所在。另外,spark的并行划分方法不能很好地解决这个问题。划分什么?任何分区都可能有与您要查找的集合相关的数据。
最好建议您在那里进行处理并将sqoop放入您的配置单元表,或者使用jdbc通过spark进行读取。
你可以按照这个没有很好记录的方法来模拟https://sqlandhadoop.com/how-to-implement-recursive-queries-in-spark/ 这里,这就是我有时用于bi幕后处理的地方。
如果您必须在spark领域中执行,那么使用graphframes方法如何,但运行速度相当慢,如下所示,使用一部分数据并稍微改变您的方法-看看您的想法:
导入org.apache.spark.rdd.rdd

import org.apache.spark.sql._
 import org.apache.spark.sql.functions._
 import org.graphframes._  
 sc.setCheckpointDir("/checkpoints")

// Subset of your data
val rdd = sc.parallelize( Array(("A", 1, None), ("B", 2, Some(1)), ("C", 3, Some(1)), ("D", 4, Some(2)), ("E", 5, Some(2)), ("F", 6, Some(2)),
                                ("G", 7, Some(5)), ("H", 8, Some(5)), ("X", 9, Some(10)), ("Y", 10, None), ("X2", 12, Some(13)), ("Y3", 13, None)  
                               ))
val df = rdd.toDF("v", "c", "p")

val dfV = df.select($"c".as("id"))
val dfE = df
  .withColumnRenamed("c", "src")
  .withColumnRenamed("p", "dst")

val nGraph = GraphFrame(dfV, dfE)
dfE.cache()
dfV.cache()
val res = nGraph.connectedComponents.run()
val res2 = res.join(df, res("id") === df("c"), "inner")
val res3 = res2.filter("p is not null").groupBy("component").agg(collect_list("id") as "group")
val res4 = res3.join(res2, res3("component") === res2("component") && res2("p").isNull , "inner")
res4.select($"id", $"group").show(false)

退货:

+---+---------------------+
|id |group                |
+---+---------------------+
|1  |[2, 3, 4, 5, 6, 7, 8]|
|10 |[9]                  |
|13 |[12]                 |
+---+---------------------+

我的建议是:在RDBMS中。使用graphframes需要很长时间。

相关问题