scala—如何正确创建带有节点和边属性的graphx

2ledvvac  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(344)

我在jupyter笔记本上运行,使用spylon内核,一个scala程序,在网络上执行一些操作。
经过一些预处理后,我得到了两个Dataframe,一个用于节点,一个用于边,类型如下:
对于节点

+---+--------------------+-------+--------+-----+
| id|                node|trip_id| stop_id| time|
+---+--------------------+-------+--------+-----+
|  0|0b29d98313189b650...| 209518|u0007405|56220|
|  1|45adb49a23257198e...| 209518|u0007409|56340|
|  2|fe5f4e2dc48b97f71...| 209518|u0007406|56460|
|  3|7b32330b6fe10b073...| 209518|u0007407|56580|
+---+--------------------+-------+--------+-----+
only showing top 4 rows

vertices_complete: org.apache.spark.sql.DataFrame = [id: bigint, node: string ... 3 more fields]

对于边

+------+-----+----+------+------+---------+---------+--------+
|   src|  dst|time|method|weight|walk_time|wait_time|bus_time|
+------+-----+----+------+------+---------+---------+--------+
| 65465|52067|2640|  walk|2640.0|     1112|     1528|       0|
| 68744|52067|1740|  walk|1740.0|      981|      759|       0|
| 55916|52067|2700|  walk|2700.0|     1061|     1639|       0|
|124559|52067|1440|  walk|1440.0|     1061|      379|       0|
| 23036|52067|1800|  walk|1800.0|     1112|      688|       0|
+------+-----+----+------+------+---------+---------+--------+
only showing top 5 rows

edges_DF: org.apache.spark.sql.DataFrame = [src: bigint, dst: bigint ... 6 more fields]

我想从中创建一个graph对象,以执行pagerank、查找最短路径等操作。因此,我将这些对象转换为rdd:

val verticesRDD : RDD[(VertexId, (String, Long, String, Long))] =  vertices_complete.rdd
    .map(row =>
        (row.getAs[Long](0), 
         (row.getAs[String]("node"), row.getAs[Long]("trip_id"), row.getAs[String]("stop_id"), row.getAs[Long]("time"))))

val edgesRDD : RDD[Edge[Long]] = edges_DF.rdd
    .map(row => 
         Edge( 
         row.getAs[Long]("src"), row.getAs[Long]("dst"), row.getAs[Long]("weight")))

val my_graph = Graph(verticesRDD, edgesRDD)

任何可以是偶数pagerank的操作(也尝试使用最短路径,错误仍然存在)

val ranks = my_graph.pageRank(0.0001).vertices

引发以下错误:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 233.0 failed 1 times, most recent failure: Lost task 5.0 in stage 233.0 (TID 9390, DESKTOP-A7EPMQG.mshome.net, executor driver): java.lang.ClassCastException

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
  at scala.Option.foreach(Option.scala:407)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2209)
  at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1157)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
  at org.apache.spark.rdd.RDD.fold(RDD.scala:1151)
  at org.apache.spark.graphx.impl.VertexRDDImpl.count(VertexRDDImpl.scala:90)
  at org.apache.spark.graphx.Pregel$.apply(Pregel.scala:140)
  at org.apache.spark.graphx.lib.PageRank$.runUntilConvergenceWithOptions(PageRank.scala:431)
  at org.apache.spark.graphx.lib.PageRank$.runUntilConvergence(PageRank.scala:346)
  at org.apache.spark.graphx.GraphOps.pageRank(GraphOps.scala:380)
  ... 40 elided
Caused by: java.lang.ClassCastException

我认为rdd对象的初始化有问题(除了权重之外,我还想向edges[time,walk\u time等等]添加属性),但是我不知道如何正确地执行。需要帮忙吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题