我试图用graphx计算一个图中来自一个源的边标签“comment”和“likedby”的数量。在第一步中,所有顶点都处于活动状态,然后我控制计算,并使用存储在每个顶点中的附加值发送消息,并使用消息进行更新。在第一次迭代中,只有相关的顶点计算并发送消息。与消息一起传播的第二个值是一个布尔值,它指示使用的边是用“comment”还是“likedby”标记的。
val fstId = ... // Considered as given
var nbComment, nbLike = 0L
sn.mapVertices((_, v) => (fstId, v)).pregel((fstId, false))(vprog, sendMsg, mergeMsg)
def vprog(id: Long, value: (Long, String), merged_msg: (Long, Boolean)) = {
println("activate " + id)
if (merged_msg != (fstId, false))
println("received message: " + merged_msg )
if (merged_msg._1 == id || merged_msg._1 < 0)
if (merged_msg._2) nbLike += 1L
else nbComment += 1L
(merged_msg._1, value._2)
}
def sendMsg(triplet: EdgeTriplet[(Long, String), String]) = {
if (triplet.srcId == fstId || triplet.srcAttr._1 == -1L)
if (triplet.attr == "comment"){
println((-1L, false) + " has been sent to " + triplet.dstId )
Iterator((triplet.dstId, (-1L, false)))
}
else if (triplet.attr == "likedBy") {
println((-1L, true) + " has been sent to " + triplet.dstId )
Iterator((triplet.dstId, (-1L, true)))
}
Iterator.empty
}
def mergeMsg(m1: (Long, Boolean), m2: (Long, Boolean)) = m1
理论上,顶点每次迭代只能接收一条消息。这个 mergeMsg
函数的定义只是为了尊重 pregel
签名。
我的问题是:邮件是正确发送的,但从来没有收到收件人和我不知道为什么。
暂无答案!
目前还没有任何答案,快来回答吧!