graphframes:合并具有类似列值的边节点

ff29svar  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(448)

热释光;dr:如何简化一个图,删除具有相同属性的边节点 name 价值观?
我有一个定义如下的图表:

import graphframes
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
vertices = spark.createDataFrame([
    ('1', 'foo', '1'),
    ('2', 'bar', '2'),
    ('3', 'bar', '3'),
    ('4', 'bar', '5'),
    ('5', 'baz', '9'),
    ('6', 'blah', '1'),
    ('7', 'blah', '2'),
    ('8', 'blah', '3')
], ['id', 'name', 'value'])

edges = spark.createDataFrame([
    ('1', '2'),
    ('1', '3'),
    ('1', '4'),
    ('1', '5'),
    ('5', '6'),
    ('5', '7'),
    ('5', '8')
], ['src', 'dst'])

f = graphframes.GraphFrame(vertices, edges)

它会生成一个如下所示的图形(其中数字表示顶点id):

从顶点id开始等于 1 ,我想把图表简化一下。使节点具有相似的 name 值合并到单个节点中。生成的图形如下所示:

注意我们只有一个 foo (id 1),一个 bar (id 2),一个 baz (id 5)和一个 blah (id 6)。这个 value 顶点的位置是不相关的,只是为了证明每个顶点是唯一的。
我试图实现一个解决方案,但它是黑客,非常低效,我肯定有一个更好的方法(我也不认为它的工作):

f = graphframes.GraphFrame(vertices, edges)

# Get the out degrees for our nodes. Nodes that do not appear in

# this dataframe have zero out degrees.

outs = f.outDegrees

# Merge this with our nodes.

vertices = f.vertices
vertices = f.vertices.join(outs, outs.id == vertices.id, 'left').select(vertices.id, 'name', 'value', 'outDegree')
vertices.show()

# Create a new graph with our out degree nodes.

f = graphframes.GraphFrame(vertices, edges)

# Find paths to all edge vertices from our vertex ID = 1

# Can we make this one operation instead of two??? What if we have more than two hops?

one_hop = f.find('(a)-[e]->(b)').filter('b.outDegree is null').filter('a.id == "1"')
one_hop.show()

two_hop = f.find('(a)-[e1]->(b); (b)-[e2]->(c)').filter('c.outDegree is null').filter('a.id == "1"')
two_hop.show()

# Super ugly, but union the vertices from the `one_hop` and `two_hop` above, and unique

# on the name.

vertices = one_hop.select('a.*').union(one_hop.select('b.*'))
vertices = vertices.union(two_hop.select('a.*').union(two_hop.select('b.*').union(two_hop.select('c.*'))))
vertices = vertices.dropDuplicates(['name'])
vertices.show()

# Do the same for the edges

edges = two_hop.select('e1.*').union(two_hop.select('e2.*')).union(one_hop.select('e.*')).distinct()

# We need to ensure that we have the respective nodes from our edges. We do this  by

# Ensuring the referenced vertex ID is in our `vertices` in both the `src` and the `dst`

# columns - This does NOT seem to work as I'd expect!

edges = edges.join(vertices, vertices.id == edges.src, "left").select("src", "dst")
edges = edges.join(vertices, vertices.id == edges.dst, "left").select("src", "dst")
edges.show()

有没有更简单的方法来删除节点(及其对应的边)以便边节点在其上唯一 name ?

xxslljrj

xxslljrj1#

你为什么不简单地对待 name 列为新列 id ?

import graphframes

vertices = spark.createDataFrame([
    ('1', 'foo', '1'),
    ('2', 'bar', '2'),
    ('3', 'bar', '3'),
    ('4', 'bar', '5'),
    ('5', 'baz', '9'),
    ('6', 'blah', '1'),
    ('7', 'blah', '2'),
    ('8', 'blah', '3')
], ['id', 'name', 'value'])

edges = spark.createDataFrame([
    ('1', '2'),
    ('1', '3'),
    ('1', '4'),
    ('1', '5'),
    ('5', '6'),
    ('5', '7'),
    ('5', '8')
], ['src', 'dst'])

# create a dataframe with only one column

new_vertices = vertices.select(vertices.name.alias('id')).distinct()

# replace the src ids with the name column

new_edges = edges.join(vertices, edges.src == vertices.id, 'left')
new_edges = new_edges.select(new_edges.dst, new_edges.name.alias('src'))

# replace the dst ids with the name column

new_edges = new_edges.join(vertices, new_edges.dst == vertices.id, 'left')
new_edges = new_edges.select(new_edges.src, new_edges.name.alias('dst'))

# drop duplicate edges

new_edges = new_edges.dropDuplicates(['src', 'dst'])

new_edges.show()
new_vertices.show()

f = graphframes.GraphFrame(new_vertices, new_edges)

输出:

+---+----+
|src| dst|
+---+----+
|foo| baz|
|foo| bar|
|baz|blah|
+---+----+

+----+
|  id|
+----+
|blah|
| bar|
| foo|
| baz|
+----+

相关问题