pyspark RDD计数DAG中的节点

qpgpyjmq  于 2023-11-16  发布在  Spark
关注(0)|答案(2)|浏览(100)

我有RDD,它显示为

["2\t{'3': 1}",
 "3\t{'2': 2}",
 "4\t{'1': 1, '2': 1}",
 "5\t{'4': 3, '2': 1, '6': 1}",
 "6\t{'2': 1, '5': 2}",
 "7\t{'2': 1, '5': 1}",
 "8\t{'2': 1, '5': 1}",
 "9\t{'2': 1, '5': 1}",
 "10\t{'5': 1}",
 "11\t{'5': 2}"]

字符串
我可以把它分开,然后计算出'\t'之前的节点,或者我可以写一个函数来计算右边的节点。这是一个weighet DAG。如果我用手数,我看到有11个节点。但是在我做区分和计数之前,我无法弄清楚如何把右边的节点1带入节点。我的代码是

`import ast
def break_nodes(line):
    data_dict = ast.literal_eval(line)
    
    # Iterate through the dictionary items and print them
    for key, value in data_dict.items():
        print(f'key {key} val {value}')
        yield (int(key))
        
    
nodeIDs = dataRDD.map(lambda line: line.split('\t')) \
                    .flatMap(lambda x: break_nodes(x[1])) \
                    .distinct()`


这只是从t的右边开始计算节点。我有左边的代码,非常简单

`nodeIDs = dataRDD.flatMap(lambda line: line.split('\t')[0])
totalCount = nodeIDs.distinct().count()`


我能对代码做些什么修改来计算所有的节点呢?尝试了这么多方法,我的大脑都被烧坏了。
感谢你的帮助

mzmfm0qo

mzmfm0qo1#

让我们使用flatMap来查找RDD中的所有节点,然后使用distinct来获取唯一节点

import ast

def find_all(r):
    x, y = r.split('\t')
    return [x, *ast.literal_eval(y).keys()]

nodes = dataRDD.flatMap(find_all).distinct()

个字符

bn31dyow

bn31dyow2#

如果\t后面的value是正确的JSON,则可以拆分字符串,第二部分解析为Map(Python dict),并将键和值添加到结果中。在Scala上:

val df = Seq("2\t{'3': 1}",
  "3\t{'2': 2}",
  "4\t{'1': 1, '2': 1}",
  "5\t{'4': 3, '2': 1, '6': 1}",
  "6\t{'2': 1, '5': 2}",
  "7\t{'2': 1, '5': 1}",
  "8\t{'2': 1, '5': 1}",
  "9\t{'2': 1, '5': 1}",
  "10\t{'5': 1}",
  "11\t{'5': 2}").toDF("value")

val disassembled = df
  .withColumn("splitted", split($"value", "\t"))
  // take first and last parts
  .withColumn("beginning", $"splitted".getItem(0))
  .withColumn("json", $"splitted".getItem(1))
  // convert to Map
  .withColumn("map", from_json(col("json"), MapType(StringType, StringType)))

disassembled.show(false)

val result = disassembled.select($"beginning".alias("ids"))
  .union(disassembled.select(explode(map_keys($"map"))))
  .union(disassembled.select(explode(map_values($"map"))))
  .distinct()

字符串
输出量:

+---------------------------+-----------------------------+---------+------------------------+------------------------+
|value                      |splitted                     |beginning|json                    |map                     |
+---------------------------+-----------------------------+---------+------------------------+------------------------+
|2\t{'3': 1}                |[2, {'3': 1}]                |2        |{'3': 1}                |{3 -> 1}                |
|3\t{'2': 2}                |[3, {'2': 2}]                |3        |{'2': 2}                |{2 -> 2}                |
|4\t{'1': 1, '2': 1}        |[4, {'1': 1, '2': 1}]        |4        |{'1': 1, '2': 1}        |{1 -> 1, 2 -> 1}        |
|5\t{'4': 3, '2': 1, '6': 1}|[5, {'4': 3, '2': 1, '6': 1}]|5        |{'4': 3, '2': 1, '6': 1}|{4 -> 3, 2 -> 1, 6 -> 1}|
|6\t{'2': 1, '5': 2}        |[6, {'2': 1, '5': 2}]        |6        |{'2': 1, '5': 2}        |{2 -> 1, 5 -> 2}        |
|7\t{'2': 1, '5': 1}        |[7, {'2': 1, '5': 1}]        |7        |{'2': 1, '5': 1}        |{2 -> 1, 5 -> 1}        |
|8\t{'2': 1, '5': 1}        |[8, {'2': 1, '5': 1}]        |8        |{'2': 1, '5': 1}        |{2 -> 1, 5 -> 1}        |
|9\t{'2': 1, '5': 1}        |[9, {'2': 1, '5': 1}]        |9        |{'2': 1, '5': 1}        |{2 -> 1, 5 -> 1}        |
|10\t{'5': 1}               |[10, {'5': 1}]               |10       |{'5': 1}                |{5 -> 1}                |
|11\t{'5': 2}               |[11, {'5': 2}]               |11       |{'5': 2}                |{5 -> 2}                |
+---------------------------+-----------------------------+---------+------------------------+------------------------+

+---------+
|ids      |
+---------+
|6        |
|4        |
|3        |
|2        |
|5        |
|10       |
|8        |
|11       |
|7        |
|9        |
|1        |
+---------+


注意:RDD在问题中提到,但可以轻松转换为DataFrame。

相关问题