sparkDataframe筛选器函数引发任务不可序列化异常

gcxthw6b  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(565)

我尝试使用带有scala匿名函数的filter函数来过滤Dataframe/数据集记录。但它抛出了任务不可序列化的异常,请有人看看代码,并解释给我什么错误的代码。

val spark = SparkSession.builder()
      .appName("test data frame")
      .master("local[*]")
      .getOrCreate()

    val user_seq = Seq(
      Row(1,"John","London"),
      Row(1,"Martin","New York"),
      Row(1,"Abhishek","New York")
    )
    val user_schema = StructType(
      Array(
        StructField("user_id",IntegerType,true),
        StructField("user_name",StringType,true),
        StructField("user_city",StringType,true)
      ))

    var user_df = spark.createDataFrame(spark.sparkContext.parallelize(user_seq),user_schema)
    var user_rdd = user_df.filter((item)=>{
      return item.getString(2) == "New York"
    })
    user_rdd.count();

我可以在控制台上看到以下异常。当我试图用columnname过滤数据时,它工作得很好。

objc[48765]: Class JavaLaunchHelper is implemented in both /Library/Java/JavaVirtualMachines/jdk1.8.0_144.jdk/Contents/Home/bin/java (0x1059db4c0) and /Library/Java/JavaVirtualMachines/jdk1.8.0_144.jdk/Contents/Home/jre/lib/libinstrument.dylib (0x105a5f4e0). One of the two will be used. Which one is undefined.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/07/18 20:10:09 INFO SparkContext: Running Spark version 2.4.6
20/07/18 20:10:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/07/18 20:10:09 INFO SparkContext: Submitted application: test data frame
20/07/18 20:10:09 INFO SecurityManager: Changing view acls groups to: 
20/07/18 20:10:09 INFO SecurityManager: Changing modify acls groups to: 
20/07/18 20:10:12 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
20/07/18 20:10:12 INFO ContextCleaner: Cleaned accumulator 0
20/07/18 20:10:13 INFO CodeGenerator: Code generated in 170.789451 ms
20/07/18 20:10:13 INFO CodeGenerator: Code generated in 17.729004 ms
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:406)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:163)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:872)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:871)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
    at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:871)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:630)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
    at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:151)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:296)
    at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2836)
    at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2835)
    at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
    at org.apache.spark.sql.Dataset.count(Dataset.scala:2835)
    at DataFrameTest$.main(DataFrameTest.scala:65)
    at DataFrameTest.main(DataFrameTest.scala)
Caused by: java.io.NotSerializableException: java.lang.Object
Serialization stack:
    - object not serializable (class: java.lang.Object, value: java.lang.Object@cec590c)
    - field (class: DataFrameTest$$anonfun$1, name: nonLocalReturnKey1$1, type: class java.lang.Object)
    - object (class DataFrameTest$$anonfun$1, <function1>)
    - element of array (index: 1)
    - array (class [Ljava.lang.Object;, size 5)
    - field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13, name: references$1, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13, <function2>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
    ... 48 more
20/07/18 20:10:13 INFO SparkContext: Invoking stop() from shutdown hook
20/07/18 20:10:13 INFO SparkUI: Stopped Spark web UI at http://192.168.31.239:4040
20/07/18 20:10:13 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/07/18 20:10:13 INFO MemoryStore: MemoryStore cleared
20/07/18 20:10:13 INFO BlockManager: BlockManager stopped
20/07/18 20:10:13 INFO BlockManagerMaster: BlockManagerMaster stopped
20/07/18 20:10:13 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/07/18 20:10:13 INFO SparkContext: Successfully stopped SparkContext
20/07/18 20:10:13 INFO ShutdownHookManager: Shutdown hook called
20/07/18 20:10:13 INFO ShutdownHookManager: Deleting directory /private/var/folders/33/3n6vtfs54mdb7x6882fyqy4mccfmvg/T/spark-3e071448-7ad7-47b8-bf70-68ab74721aa2

Process finished with exit code 1
mefy6pfw

mefy6pfw1#

删除 return 关键字在下面的行中。
更改以下代码

var user_rdd = user_df.filter((item)=>{
    return item.getString(2) == "New York"
})

带下线 var user_rdd = user_df.filter(_.getString(2) == "New York") 或者 user_df.filter($"user_city" === "New York").count 还可以像下面那样重构代码。

val df = Seq((1,"John","London"),(1,"Martin","New York"),(1,"Abhishek","New York"))
.toDF("user_id","user_name","user_city")

df.filter($"user_city" === "New York").count

相关问题