如何通过合并pregel消息数组和上一次迭代中的数组来更新withvertexcolumn

inb24sb2  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(333)

我试图实现一个简单的消息传递系统,以了解工作原理。我的目标是将每个顶点的名称发送给父节点。所需的实现是将先前实现创建的“iterationmsg”的现有数组合并到具有这些值的新数组中。我已经使用aggregatemessages函数实现了这一点,它使用了与信念传播相同的逻辑,但是我愿意使用PregelAPI来实现这一点。如何在“iterationmsg中的现有数组”处引用“iterationmsg”中的数组?

v = sqlc.createDataFrame([
            (0, "Anna", 24),
            (1, "Bob", 26),
            (2, "Charlie", 27),
            (3, "David", 27),
            (4, "Eric", 26),
        ], ["id", "name", "age"])

        # Edge DataFrame
    e = sqlc.createDataFrame([
            (0, 0, 1, "friend"),
            (1, 1, 2, "friend"),
            (2, 2, 1, "friend"),
            (3, 1, 1, "loves"),
            (4, 2, 0, "pays"),
            (5, 0, 3, "boss"),
            (6, 3, 4, "boss"),
            (7, 0, 4, "boss")
        ], ["id", "src", "dst", "label"])

   g = GraphFrame(v.withColumn('state', f.lit(True)).withColumn('iterationMsg', f.array()), e)  

   NR_ITER = 2

   msgpassing = g.pregel \
        .setMaxIter(NR_ITER) \
        .withVertexColumn("iterationMsg", f.array(), 
                          f.array_union('existing array in iterationMsg', Pregel.msg())) \
        .sendMsgToSrc(f.when(Pregel.dst('state'),
                             f.array_union(f.array(Pregel.dst('name')), Pregel.dst('iterationMsg')))) \
        .aggMsgs(f.array(Pregel.msg())) \
        .run()

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题