这是我的密码:
val rdd = session.sparkContext
.parallelize(offsets, offsets.size)
.flatMap[Row](offset => {
val query = s"${config.exec} SKIP ${offset.start} LIMIT ${offset.size}"
val result = new Neo4jSessionAwareIterator(neo4jConfig, query, Maps.newHashMap(), false)
val fields = if (result.hasNext) result.peek().keys().asScala else List()
val schema =
if (result.hasNext)
StructType(
fields
.map(k => (k, result.peek().get(k).`type`()))
.map(keyType => CypherTypes.field(keyType)))
else new StructType()
result.map(record => {
val row = new Array[Any](record.keys().size())
for (i <- row.indices)
row.update(i, Executor.convert(record.get(i).asObject()))
new GenericRowWithSchema(values = row, schema).asInstanceOf[Row]
})
})
if (rdd.isEmpty())
throw new RuntimeException()
val schema = rdd.repartition(1).first().schema
session.createDataFrame(rdd, schema)
当我在服务器上使用 spark-submit
或者我的电脑使用ide和spark-2.4.6是正确的。
但当我使用 spark-submit
在“我的电脑”或服务器中使用spark-3.0时,将抛出强制转换异常 rdd.isEmpty()
.
这是我的例外信息(部分):
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.collection.GenTraversableOnce
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:488)
at scala.collection.Iterator$SliceIterator.hasNext(Iterator.scala:266)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
at scala.collection.AbstractIterator.to(Iterator.scala:1429)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1409)
Driver stacktrace:
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
at org.apache.spark.rdd.RDD.take(RDD.scala:1382)
at org.apache.spark.rdd.RDD.$anonfun$isEmpty$1(RDD.scala:1517)
at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1517)
at com.xx.xx.tools.importer.reader.Neo4JReader.read(ServerBaseReader.scala:146)
谢谢你找我!!!
暂无答案!
目前还没有任何答案,快来回答吧!