在我的spark(2.4)scala脚本中,我使用的方法来自使用awsdkjava编写的外部类。这些方法大多是s3上的简单操作,如删除文件夹等:
import java.io.File
import com.amazonaws.services.s3.AmazonS3ClientBuilder
import com.amazonaws.services.s3.model.{ListObjectsV2Request, PutObjectRequest}
import scala.collection.JavaConverters._
object S3Utils {
def deleteS3Folder(bucket: String, key: String) = {
val s3Client = AmazonS3ClientBuilder.standard().build()
val keyList = listFiles(bucket, key)
if (keyList.nonEmpty) keyList.reverse.foreach(s3Client.deleteObject(bucket, _))
}
// other methods....
}
问题是,在一些不相关的rdd操作中,它们被序列化到其他executor节点。这就产生了一些问题,比如
Exception in task 25.1 in stage 16.0 (TID 370)
java.io.InvalidClassException: script.MyScript$$anonfun$1; local class incompatible: stream classdesc serialVersionUID = 0, local class serialVersionUID = -8902657938625313429
在这样的脚本中(我将geospark用于rdds)
object MyScript {
def main(sysArgs: Array[String]): Unit = {
// firstDF and secondDF from spark.read.format("csv")...
var firstRDD = Adapter.toSpatialRdd(firstDF, "geometry_point")
var secondRDD = Adapter.toSpatialRdd(secondDF, "geometry_point")
val joinResultPairRDD = JoinQuery.SpatialJoinQueryFlat(firstRDD, secondRDD, usingIndex, considerBoundaryIntersection).rdd.map { x =>
val pcsFields = x._1.getUserData.asInstanceOf[String].split("\t")
(x._2.toText, pcsFields(0), pcsFields(1), pcsFields(2))
}
S3Utils.deleteS3Folder(....)
}
}
我试着改变 S3Utils
从 object
至 class
至 trait
但一切都没变。你知道如何创建这样的可调用外部方法吗?
暂无答案!
目前还没有任何答案,快来回答吧!