在aws sdk的不同类中使用方法时,如何避免spark中的序列化问题?

fiei3ece  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(319)

在我的spark(2.4)scala脚本中,我使用的方法来自使用awsdkjava编写的外部类。这些方法大多是s3上的简单操作,如删除文件夹等:

import java.io.File
import com.amazonaws.services.s3.AmazonS3ClientBuilder
import com.amazonaws.services.s3.model.{ListObjectsV2Request, PutObjectRequest}
import scala.collection.JavaConverters._

object S3Utils {

  def deleteS3Folder(bucket: String, key: String) = {
    val s3Client = AmazonS3ClientBuilder.standard().build()
    val keyList = listFiles(bucket, key)
    if (keyList.nonEmpty) keyList.reverse.foreach(s3Client.deleteObject(bucket, _))
  }

  // other methods....
}

问题是,在一些不相关的rdd操作中,它们被序列化到其他executor节点。这就产生了一些问题,比如

Exception in task 25.1 in stage 16.0 (TID 370)
java.io.InvalidClassException: script.MyScript$$anonfun$1; local class incompatible: stream classdesc serialVersionUID = 0, local class serialVersionUID = -8902657938625313429

在这样的脚本中(我将geospark用于rdds)

object MyScript {
   def main(sysArgs: Array[String]): Unit = {

     // firstDF and secondDF from spark.read.format("csv")...

     var firstRDD = Adapter.toSpatialRdd(firstDF, "geometry_point")
     var secondRDD = Adapter.toSpatialRdd(secondDF, "geometry_point")

     val joinResultPairRDD = JoinQuery.SpatialJoinQueryFlat(firstRDD, secondRDD, usingIndex, considerBoundaryIntersection).rdd.map { x =>
 val pcsFields = x._1.getUserData.asInstanceOf[String].split("\t")
 (x._2.toText, pcsFields(0), pcsFields(1), pcsFields(2))
}

     S3Utils.deleteS3Folder(....)
   }
}

我试着改变 S3Utilsobjectclasstrait 但一切都没变。你知道如何创建这样的可调用外部方法吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题