我正在用graphx绘制一个图形。我想以这样一种方式来实现这个算法:每个节点向它的邻居发送它的属性,每个邻居接收消息,然后保存它的得分最高的属性。例如,发送到节点的消息如下: (4,0.96),(8,0.1),(15,0.8),...
. 第一个数字是标签,第二个数字是该标签的分数。在这种情况下,将选择得分为0.96的标签4,因为此时它的得分最高。在算法的最后,每个节点都有一个列表,其中存储了在每次迭代中得分最高的标签。
我正在使用的图形具有如下结构:
(1,(11,0.2))
(2,(8,0.6))
(3,(5,0.3))
(4,(4,1))
...
键是nodeid,属性由 label
哪个是 Long
以及 score
标签的 Double
.
预期的最终结构类似于此结构:
(1,List((2,0.49),(8,0.9),(13,0.79)))
(2,List((11,0.89),(6,0.68),(13,0.79),(10,0.57)))
(3,List((20,0.0.8),(1,0.66)))
...
上面的结构意味着节点1已经接收到三个标签,分别是2、8和13。
我试图使用pregel作为我的算法,但我面临着一些类型不匹配的问题。有人能帮我用pregel实现代码吗?我会非常感激的。
这是我试图写的代码,但我不能完成它!
def run[VD, ED: ClassTag](graph: Graph[VD, ED], maxSteps: Int): Graph[VD, ED] = {
val temp_graph = graph
def sendMessage(e: EdgeTriplet[VertexId, ED]): Iterator[(VertexId,(Long,Double))] = {
Iterator((e.srcId,e.dstAttr), (e.dstId,e.srcAttr))
}
def mergeMessage() = {
}
def vertexProgram(vid: VertexId, attr: (Long, Double), message: (Long, Double)) = {
}
val initialMessage = (Long, Double)
Pregel(temp_graph, initialMessage, maxIterations = maxSteps)(
vprog = vertexProgram,
sendMsg = sendMessage,
mergeMsg = mergeMessage)
}
val new_updated_graph = run(new_graph,5)
暂无答案!
目前还没有任何答案,快来回答吧!