我在jupyter笔记本上运行,使用spylon内核,一个scala程序,在网络上执行一些操作。
经过一些预处理后,我得到了两个Dataframe,一个用于节点,一个用于边,类型如下:
对于节点
+---+--------------------+-------+--------+-----+
| id| node|trip_id| stop_id| time|
+---+--------------------+-------+--------+-----+
| 0|0b29d98313189b650...| 209518|u0007405|56220|
| 1|45adb49a23257198e...| 209518|u0007409|56340|
| 2|fe5f4e2dc48b97f71...| 209518|u0007406|56460|
| 3|7b32330b6fe10b073...| 209518|u0007407|56580|
+---+--------------------+-------+--------+-----+
only showing top 4 rows
vertices_complete: org.apache.spark.sql.DataFrame = [id: bigint, node: string ... 3 more fields]
对于边
+------+-----+----+------+------+---------+---------+--------+
| src| dst|time|method|weight|walk_time|wait_time|bus_time|
+------+-----+----+------+------+---------+---------+--------+
| 65465|52067|2640| walk|2640.0| 1112| 1528| 0|
| 68744|52067|1740| walk|1740.0| 981| 759| 0|
| 55916|52067|2700| walk|2700.0| 1061| 1639| 0|
|124559|52067|1440| walk|1440.0| 1061| 379| 0|
| 23036|52067|1800| walk|1800.0| 1112| 688| 0|
+------+-----+----+------+------+---------+---------+--------+
only showing top 5 rows
edges_DF: org.apache.spark.sql.DataFrame = [src: bigint, dst: bigint ... 6 more fields]
我想从中创建一个graph对象,以执行pagerank、查找最短路径等操作。因此,我将这些对象转换为rdd:
val verticesRDD : RDD[(VertexId, (String, Long, String, Long))] = vertices_complete.rdd
.map(row =>
(row.getAs[Long](0),
(row.getAs[String]("node"), row.getAs[Long]("trip_id"), row.getAs[String]("stop_id"), row.getAs[Long]("time"))))
val edgesRDD : RDD[Edge[Long]] = edges_DF.rdd
.map(row =>
Edge(
row.getAs[Long]("src"), row.getAs[Long]("dst"), row.getAs[Long]("weight")))
val my_graph = Graph(verticesRDD, edgesRDD)
任何可以是偶数pagerank的操作(也尝试使用最短路径,错误仍然存在)
val ranks = my_graph.pageRank(0.0001).vertices
引发以下错误:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 233.0 failed 1 times, most recent failure: Lost task 5.0 in stage 233.0 (TID 9390, DESKTOP-A7EPMQG.mshome.net, executor driver): java.lang.ClassCastException
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2209)
at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1157)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
at org.apache.spark.rdd.RDD.fold(RDD.scala:1151)
at org.apache.spark.graphx.impl.VertexRDDImpl.count(VertexRDDImpl.scala:90)
at org.apache.spark.graphx.Pregel$.apply(Pregel.scala:140)
at org.apache.spark.graphx.lib.PageRank$.runUntilConvergenceWithOptions(PageRank.scala:431)
at org.apache.spark.graphx.lib.PageRank$.runUntilConvergence(PageRank.scala:346)
at org.apache.spark.graphx.GraphOps.pageRank(GraphOps.scala:380)
... 40 elided
Caused by: java.lang.ClassCastException
我认为rdd对象的初始化有问题(除了权重之外,我还想向edges[time,walk\u time等等]添加属性),但是我不知道如何正确地执行。需要帮忙吗?
暂无答案!
目前还没有任何答案,快来回答吧!