我有一个Dataframe,它的行通过各种合并相互连接。到目前为止,我已经把df转换成下面的格式,在这里我做了一个groupby“merge\u to”,并将这些数据收集到一个数组中,然后将其连接回我原来的df。看起来是这样的:
df1
+---+--------+---------+
|Ref|Merge_To| Merges|
+---+--------+---------+
| 1| \N|[3, 2, 3]|
| 2| 1|[5, 4, 6]|
| 5| 2| [8, 7]|
| 10| \N| [9, 9]|
| 12| \N| [13]|
| 14| \N| [15]|
| 16| 18| [17]|
| 17| 16| [19]|
| 18| \N| [16]|
| 19| 17| [20]|
+---+--------+---------+
对于参考文献1、2、5和18、16、17、19、20,它们通过一条链合并在一起。我以前做过的groupby没有捕捉到。最终,我希望我的df看起来像这样,它解释了合并链:
+---+--------+------------------------+
|Ref|Merge_To| Merges|
+---+--------+------------------------+
| 1| \N|[3, 2, 3, 5, 4, 6, 8, 7]|
| 10| \N| [9, 9]|
| 12| \N| [13]|
| 14| \N| [15]|
| 18| \N| [16, 17, 19, 20]|
+---+--------+------------------------+
当“merge\u to”不可用时,我尝试将df1连接到已筛选的自身\n
val arrayCombineUDF = udf((a:Seq[String], b:Seq[String]) => a ++ b )
val df1Filter = df1.filter($"Merge_To" !== "\\N").
select("Merge_To", "Merges").withColumnRenamed("Merge_To", "Chain_Ref").
withColumnRenamed("Merges", "Chain_Merges")
val df2 = df1.join(df1Filter, $"Ref" === $"Chain_Ref", "left").
withColumn("Merges", when($"Chain_Merges".isNotNull, arrayCombineUDF($"Merges", $"Chain_Merges")).
otherwise($"Merges")).
select("Ref", "Merge_To", "Merges")
df2
+---+--------+----------+------------------+
|Ref|Merge_To|Merge_From| Merges|
+---+--------+----------+------------------+
| 1| \N| 3|[3, 2, 3, 5, 4, 6]|
| 2| 1| \N| [5, 4, 6, 8, 7]|
| 5| 2| \N| [8, 7]|
| 10| \N| 9| [9, 9]|
| 12| \N| 13| [13]|
| 14| \N| \N| [15]|
| 16| 18| \N| [17, 19]|
| 17| 16| \N| [19, 20]|
| 18| \N| \N| [16, 17]|
| 19| 17| \N| [20]|
+---+--------+----------+------------------+
这种类型的结果,我正在寻找,但实际上只占一个层次的合并链。
我还尝试过将上述相同的连接过程放入while循环中,以使其重复连接。我还尝试将udfs与if语句结合使用,希望能够将每一行分类为merge的类型,并使用它组合成一个链。
注意:我知道数组不清晰,但我不介意,我可以在最后排序。
编辑这是原始数据框
+---+--------+----------+
|Ref|Merge_To|Merge_From|
+---+--------+----------+
| 1| \N| 3|
| 2| 1| \N|
| 3| 1| \N|
| 4| 2| \N|
| 5| 2| \N|
| 6| 2| \N|
| 7| 5| \N|
| 8| 5| \N|
| 9| 10| \N|
| 10| \N| 9|
| 11| \N| \N|
| 12| \N| 13|
| 13| \N| \N|
| 14| \N| \N|
| 15| 14| \N|
| 16| 18| \N|
| 17| 16| \N|
| 18| \N| \N|
| 19| 17| \N|
| 20| 19| \N|
+---+--------+----------+
1条答案
按热度按时间5kgi1eie1#
11号条目似乎已经越狱了。不管怎样。
从您的基础数据来看,这是一个分层查询,可以在具有良好功能的传统RDBMS中解决,例如在大多数RDBMS中使用connect by子句或recursive with view。
你所做的尝试停留在了1级,这就是问题的症结所在。另外,spark的并行划分方法不能很好地解决这个问题。划分什么?任何分区都可能有与您要查找的集合相关的数据。
最好建议您在那里进行处理并将sqoop放入您的配置单元表,或者使用jdbc通过spark进行读取。
你可以按照这个没有很好记录的方法来模拟https://sqlandhadoop.com/how-to-implement-recursive-queries-in-spark/ 这里,这就是我有时用于bi幕后处理的地方。
如果您必须在spark领域中执行,那么使用graphframes方法如何,但运行速度相当慢,如下所示,使用一部分数据并稍微改变您的方法-看看您的想法:
导入org.apache.spark.rdd.rdd
退货:
我的建议是:在RDBMS中。使用graphframes需要很长时间。