为什么在spark graphx中执行pregel时出现类型不匹配错误?

nfzehxib  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(420)

我已经用spark graphx中的pregel编写了我的算法。但不幸的是,我得到了类型不匹配的错误。我在图表中加载: val my_graph= GraphLoader.edgeListFile(sc, path) . 因此,节点的开头具有如下结构:

(1,1)
(2,1)
(3,1)

以nodeid作为键,1是其默认属性。
内部 run2 函数,首先我更改结构,以便使每个节点可以存储多个属性。因为我正在研究重叠社区检测算法,所以属性是标签和它们的分数。在第一次运行时 run2 ,每个节点具有如下结构:

(34,Map(34 -> (1.0,34)))
(13,Map(13 -> (1.0,13)))
(4,Map(4 -> (1.0,4)))
(16,Map(16 -> (1.0,16)))
(22,Map(22 -> (1.0,22)))

这意味着节点34具有标签34,其得分等于1。然后每个节点可以存储从其邻居接收的几个属性,并在接下来的步骤中将它们发送给其邻居。
在算法结束时,每个节点可以包含多个属性或仅包含一个属性,如以下结构:

(1,Map((2->(0.49,1),(8->(0.9,1)),(13->(0.79,1))))
(2,Map((11->(0.89,2)),(6->(0.68,2)),(13->(0.79,2)),(10->(0.57,2))))
(3,Map((20->(0.0.8,3)),(1->(0.66,3))))

上面的结构表明,例如,节点1属于社区2,得分为0.49;节点8属于社区8,得分为0.9;节点13属于社区13,得分为0.79。
下面的代码显示了pregel中定义的不同函数。

def run2[VD, ED: ClassTag](graph: Graph[VD, ED], maxSteps: Int) = {

  val temp_graph = graph.mapVertices { case (vid, _) => mutable.HashMap[VertexId, (Double,VertexId)](vid -> (1,vid)) }

  def sendMessage(e: EdgeTriplet[mutable.HashMap[VertexId, (Double,VertexId)], ED]): Iterator[(VertexId, mutable.HashMap[VertexId, (Double, VertexId)])] = {
    Iterator((e.srcId,e.dstAttr), (e.dstId,e.srcAttr))
  }

  def mergeMessage(count1: (mutable.HashMap[VertexId, (Double,VertexId)]), count2: (mutable.HashMap[VertexId, (Double,VertexId)]))= {

    val communityMap = new mutable.HashMap[VertexId, List[(Double, VertexId)]]

    (count1.keySet ++ count2.keySet).map(key => {

      val count1Val = count1.getOrElse(key, (0D,0:VertexId))
      val count2Val = count2.getOrElse(key, (0D,0:VertexId))

      communityMap += key->(count1Val::communityMap(key))
      communityMap += key->(count2Val::communityMap(key))

    })
    communityMap
  }

  def vertexProgram(vid: VertexId, attr: mutable.HashMap[VertexId,(Double, VertexId)], message: mutable.HashMap[VertexId, List[(Double, VertexId)]]) = {
    if (message.isEmpty)
      attr
    else {
      val labels_score: mutable.HashMap[VertexId, Double] = message.map {
        key =>
          var value_sum = 0D
          var isMemberFlag = 0
          var maxSimilar_result = 0D
          val max_similar = most_similar.filter(x=>x._1==vid)(1)
          if (key._2.exists(x=>x._2==max_similar)) isMemberFlag = 1 else isMemberFlag = 0

          key._2.map {
            values =>
              if (values._2==max_similar) maxSimilar_result = values._1 else maxSimilar_result = 0D

              val temp = broadcastVariable.value(vid)(values._2)._2
              value_sum += values._1 * temp
          }
          value_sum += (beta*value_sum)+((1-beta)*maxSimilar_result)
          (key._1,value_sum) //label list
      }

      val max_value = labels_score.maxBy(x=>x._2)._2.toDouble
      val dividedByMax = labels_score.map(x=>(x._1,x._2/max_value)) // divide by maximum value

      val resultMap: mutable.HashMap[VertexId,Double] = new mutable.HashMap[VertexId, Double]
      dividedByMax.foreach{ row => // select labels more than threshold P = 0.5
        if (row._2 >= p) resultMap += row
      }

      val max_for_normalize= resultMap.values.sum
      val res = resultMap.map(x=>(x._1->(x._2/max_for_normalize,x._1))) // Normalize labels

      res
    }
  }

  val initialMessage = mutable.HashMap[VertexId, (Double,VertexId)]()

  val overlapCommunitiesGraph = Pregel(temp_graph, initialMessage, maxIterations = maxSteps)(
    vprog = vertexProgram,
    sendMsg = sendMessage,
    mergeMsg = mergeMessage)

  overlapCommunitiesGraph
}

val my_graph= GraphLoader.edgeListFile(sc, path)
val new_updated_graph2 = run2(my_graph, 1)

在上述代码中, p=0.5 以及 beta=0.5 . most_similar 是包含每个节点及其最重要节点的rdd。例如 (1,3) 意味着节点3是与节点1最相似的邻居。这个 broadcatVariable 结构如下:

(19,Map(33 -> (1.399158675718661,0.6335049099178383), 34 -> (1.4267350687130098,0.6427405501408145)))

(15,Map(33 -> (1.399158675718661,0.6335049099178383), 34 -> (1.4267350687130098,0.6427405501408145)))
...

该结构显示节点作为键与其邻居作为值之间的关系。例如,节点19与节点33和34是邻居,并且通过它们之间的得分来显示关系。
在该算法中,每个节点发送每个 Map 包含几个标签和它们的分数。然后在 mergeMessage 函数,将具有相同编号的标签的值放入 List 而在 vertexProgram 对于每个标签或键,都会处理其列表。
更新
根据下图中的等式,我使用 List 为标签收集不同的分数并在 vertexProgram 功能。因为我需要 P_ji 为了处理每个节点的标签分数,所以我不知道是否可以在 mergeMessage 功能或是否需要 vertexProgram . P_ji 是源节点与其相邻节点之间的分数,该分数应乘以标签分数。

我得到的错误显示在行的前面 vprog = vertexProgram, 如图所示。有人能帮我解决这个错误吗?

xkrw2x1b

xkrw2x1b1#

主要的问题是您对消息使用了两种不同的类型。初始消息的类型为 mutable.HashMap[VertexId, (Double,VertexId)] 但是在合并了两个(和 mergeMessage 函数)类型变为 mutable.HashMap[VertexId, List[(Double,VertexId)]] . 这里的问题是,现在合并的消息无法与其他消息合并,因为类型错误。
有两种方法可以解决这个问题:
将消息类型更改为 mutable.HashMap[VertexId, List[(Double,VertexId)]] ,确保初始消息与此匹配。
将消息类型保留为 mutable.HashMap[VertexId, (Double,VertexId)] 并更改 mergeMessage 匹配。
下面是两个选项可能解决方案的一些示意图。其中可能有一些错误,因为实际需要的逻辑不是很清楚(代码中有一些未使用的变量等)。这两个选项在与其余代码组合时都可以运行,并将返回一个新的图形。
解决方案1:
你需要调整 sendMessage , mergeMessage 以及 initialMessage 处理列表。具体做法如下:

def sendMessage(e: EdgeTriplet[Map[VertexId, (Double,VertexId)], ED]): Iterator[(VertexId, Map[VertexId, List[(Double, VertexId)]])] = {
  val msg1 = e.dstAttr.map{ case (k,v) => (k, List(v)) }
  val msg2 = e.srcAttr.map{ case (k,v) => (k, List(v)) }
  Iterator((e.srcId, msg1), (e.dstId, msg2))
}

def mergeMessage(count1: Map[VertexId, List[(Double,VertexId)]], count2: Map[VertexId, List[(Double,VertexId)]])= {
  val merged = count1.toSeq ++ count2.toSeq
  val new_message = merged.groupBy(_._1).map{case (k,v) => (k, v.map(_._2).flatten.toList)}
  new_message
}

val initialMessage = Map[VertexId, List[(Double,VertexId)]]()

可能是 messages.isEmpty 返回 vertexProgram 也需要调整。
解决方案2:
要使用不带列表的消息,需要将合并逻辑从 vertexProgrammergeMessage . 我稍微简化了代码,因此代码可能需要一些测试。

def mergeMessage(count1: (Map[VertexId, (Double, VertexId)]), count2: (Map[VertexId, (Double, VertexId)]))= {

  val merged = count1.toSeq ++ count2.toSeq
  val grouped = merged.groupBy(_._1)

  val new_message = grouped.map{ case (key, key_values) =>
    val values = key_values.map(_._2)

    val max_similar = most_similar.filter(x => x._1 == key).headOption match {
      case Some(x) => x  
      case _ => -1   // What should happen when there is no match?
    }

    val maxSimilar_result = values.filter(v => v._2 == max_similar).headOption match {
      case Some(x) => x._1
      case _ => 0.0
    }

    val value_sum = values.map{ v => v._1 * broadcastVariable.value(key)(v._2)._2}.sum
    val res = (beta*value_sum)+((1-beta)*maxSimilar_result)
    (key, (res, key))
  }

  new_message.toMap
}

def vertexProgram(vid: VertexId, attr: Map[VertexId, (Double, VertexId)], messages: Map[VertexId, (Double, VertexId)]) = {
  if (messages.isEmpty){
    attr
  } else { 
    val labels_score = messages.map(m => (m._1, m._2._1))
    val max_value = labels_score.maxBy(x => x._2)._2.toDouble
    val dividedByMax = labels_score.map(x => (x._1, x._2 / max_value)) // divide by maximum value

    // select labels more than threshold P = 0.5
    val resultMap = dividedByMax.filter{ row => row._2 >= p }

    val max_for_normalize= resultMap.values.sum
    val res = resultMap.map(x => (x._1 -> (x._2 / max_for_normalize, x._1))) // Normalize labels

    res
  }
}

笔记:
目前在 sendMessage ,消息被发送到两个节点,与图边的方向无关。如果这是正确的取决于想要的逻辑。
我变了 mutable.HashMap 正常(不变) Map . 如果可能的话,最好使用不可变选项。
解决方案1应该更易于用作中的逻辑 vertexProgram 很复杂。还有一些变量,目前没有做任何事情,但也许他们会被使用以后。如果无法以迭代方式合并消息(并且您需要一次查看所有消息),那么使用 List 会是最好的选择。

相关问题