为什么我在graphx的pregel中遇到类型不匹配错误?

fgw7neuy  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(193)

我正在用graphx绘制一个图形。我想使用pregelapi对图形进行并行操作。我正在研究的图表结构如下:

(1,(4,0.08))
(2,(9,0.9))
(3,(3,0.01))
(4,(1, 0.31))
...

键是nodeid,属性由 label 哪个是 vertexid 以及标签的分数 Double .
我想以这样一种方式来实现这个算法:每个节点向它的邻居发送它的属性,每个邻居接收消息,然后保存它的得分最高的属性。例如,发送到节点的消息如下:(4,0.96),(8,0.1),(15,0.8),。。。。第一个数字是标签,第二个数字是该标签的分数。在这种情况下,将选择得分为0.96的标签4,因为此时它的得分最高。在算法的最后,每个节点都有一个列表,其中存储了在每次迭代中得分最高的标签。
预期的最终结构类似于此结构:

(1,List((2,0.49),(8,0.9),(13,0.79)))
(2,List((11,0.89),(6,0.68),(13,0.79),(10,0.57)))
(3,List((20,0.0.8),(1,0.66)))
...

上面的结构意味着节点1已经接收到三个标签,分别是2、8和13。
我试图使用pregel作为我的算法,但我面临着一些类型不匹配的问题。有人能帮我用pregel实现代码吗?我会非常感激的。
这是我试图写的代码,但我不能完成它!

def run[VD, ED: ClassTag](graph: Graph[VD, ED], maxSteps: Int): Graph[VD, ED] = {

      val temp_graph: Graph[(VertexId, Double), ED] = graph.mapVertices((x, y) => (x,1.toDouble))

      def sendMessage(e: EdgeTriplet[(VertexId, Double), ED]): Iterator[(VertexId, (VertexId, Double))] = {
        Iterator((e.srcId, e.dstAttr), (e.dstId, e.srcAttr))
      }

      def mergeMessage(count1: Map[VertexId, Double], count2: Map[VertexId, Double]): mutable.Map[VertexId, Double] = {

        val map = mutable.Map[VertexId, Double]()
        (count1.keySet ++ count2.keySet).foreach { i =>
          val count1Val = count1.getOrElse(i, 0D)
          val count2Val = count2.getOrElse(i, 0D)
          map.put(i, count1Val + count2Val)
        }
        map
      }

      def vertexProgram(vid: VertexId, attr: (VertexId, Double), message: Map[VertexId, Double]): (VertexId, Double) = {

        if (message.isEmpty) attr
        else message.maxBy(_._2) 
        // I have some problem here in adding the the label with maximum score to label list of node

      }

      val initialMessage = Map[VertexId, Double]()
      Pregel(temp_graph, initialMessage, maxIterations = maxSteps)(
        vprog = vertexProgram,
        sendMsg = sendMessage,
        mergeMsg = mergeMessage)
    }

这是我得到的错误:

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题