我有RDD,它显示为
["2\t{'3': 1}",
"3\t{'2': 2}",
"4\t{'1': 1, '2': 1}",
"5\t{'4': 3, '2': 1, '6': 1}",
"6\t{'2': 1, '5': 2}",
"7\t{'2': 1, '5': 1}",
"8\t{'2': 1, '5': 1}",
"9\t{'2': 1, '5': 1}",
"10\t{'5': 1}",
"11\t{'5': 2}"]
字符串
我可以把它分开,然后计算出'\t'之前的节点,或者我可以写一个函数来计算右边的节点。这是一个weighet DAG。如果我用手数,我看到有11个节点。但是在我做区分和计数之前,我无法弄清楚如何把右边的节点1带入节点。我的代码是
`import ast
def break_nodes(line):
data_dict = ast.literal_eval(line)
# Iterate through the dictionary items and print them
for key, value in data_dict.items():
print(f'key {key} val {value}')
yield (int(key))
nodeIDs = dataRDD.map(lambda line: line.split('\t')) \
.flatMap(lambda x: break_nodes(x[1])) \
.distinct()`
型
这只是从t的右边开始计算节点。我有左边的代码,非常简单
`nodeIDs = dataRDD.flatMap(lambda line: line.split('\t')[0])
totalCount = nodeIDs.distinct().count()`
型
我能对代码做些什么修改来计算所有的节点呢?尝试了这么多方法,我的大脑都被烧坏了。
感谢你的帮助
2条答案
按热度按时间mzmfm0qo1#
让我们使用
flatMap
来查找RDD中的所有节点,然后使用distinct
来获取唯一节点个字符
bn31dyow2#
如果
\t
后面的value是正确的JSON,则可以拆分字符串,第二部分解析为Map(Python dict),并将键和值添加到结果中。在Scala上:字符串
输出量:
型
注意:RDD在问题中提到,但可以轻松转换为DataFrame。