给出一个简单的udaf代码,比如下面的代码,在编译和运行某些Dataframe时,我会出错。我知道在自定义项中,我们需要做一些特别的事情,比如下面的答案。udaf的等价物是什么?
目录=
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import scala.collection.mutable.ArrayBuffer
class GroupConcatUdafZ(distinct: Boolean) extends UserDefinedAggregateFunction with Serializable {
def inputSchema = new StructType().add("x", StringType)
def bufferSchema = new StructType().add("buff", ArrayType(StringType))
def dataType = StringType
def deterministic = true
override def initialize(buffer: MutableAggregationBuffer) = {
buffer.update(0, ArrayBuffer.empty[String])
}
override def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0))
if (distinct && !buffer.getSeq[String](0).contains(input.getString(0)) || !distinct)
buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0))
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
val mergedResult = buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0)
buffer1.update(0, if (distinct) mergedResult.distinct else mergedResult)
}
override def evaluate(buffer: Row) =
buffer.getSeq[String](0).mkString(",")
}
spark.udf.register("GroupConcatUdafZ", new GroupConcatUdafZ(false))
spark.sql("select Date, GroupConcatUdafZ(Name) from (select * from some_table limit 3) group by Date")
和
package somePackage
abstract class MyClass {
def process(spark: SparkSession): Any
}
上面的代码是使用
val toolBox: ToolBox[ru.type] = currentMirror.mkToolBox()
val tree: toolBox.u.Tree = toolBox.parse(s"import $packageName;$contents")
val compiledCode: () => Any = toolBox.compile(tree)
val myClass: MyClass = compiledCode().asInstanceOf[MyClass]
myClass.process(spark)
其中packagename是ru可以访问现有myclass的包,内容是上面代码的字符串。
错误:
Serialization stack:
- object not serializable (class: __wrapper$1$a9f023af34c0411e98d22696d01fb0a4.__wrapper$1$a9f023af34c0411e98d22696d01fb0a4$$anon$1, value: __wrapper$1$a9f023af34c0411e98d22696d01fb0a4.__wrapper$1$a9f023af34c0411e98d22696d01fb0a4$$anon$1@761332fe)
- field (class: __wrapper$1$a9f023af34c0411e98d22696d01fb0a4.__wrapper$1$a9f023af34c0411e98d22696d01fb0a4$$anon$1$GroupConcatUdafZ$1, name: $outer, type: class __wrapper$1$a9f023af34c0411e98d22696d01fb0a4.__wrapper$1$a9f023af34c0411e98d22696d01fb0a4$$anon$1)
- object (class __wrapper$1$a9f023af34c0411e98d22696d01fb0a4.__wrapper$1$a9f023af34c0411e98d22696d01fb0a4$$anon$1$GroupConcatUdafZ$1, __wrapper$1$a9f023af34c0411e98d22696d01fb0a4.__wrapper$1$a9f023af34c0411e98d22696d01fb0a4$$anon$1$GroupConcatUdafZ$1@141c64dc)
- field (class: org.apache.spark.sql.execution.aggregate.ScalaUDAF, name: udaf, type: class org.apache.spark.sql.expressions.UserDefinedAggregateFunction)
- object (class org.apache.spark.sql.execution.aggregate.ScalaUDAF, GroupConcatUdafZ$1(Name#0))
- field (class: org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression, name: aggregateFunction, type: class org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction)
- object (class org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression, partial_groupconcatudafz$1(Name#0, __wrapper$1$a9f023af34c0411e98d22696d01fb0a4.__wrapper$1$a9f023af34c0411e98d22696d01fb0a4$$anon$1$GroupConcatUdafZ$1@141c64dc, 0, 0))
- writeObject data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@6ea57eb2)
- writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.$colon$colon, List(partial_groupconcatudafz$1(Name#0, __wrapper$1$a9f023af34c0411e98d22696d01fb0a4.__wrapper$1$a9f023af34c0411e98d22696d01fb0a4$$anon$1$GroupConcatUdafZ$1@141c64dc, 0, 0)))
- field (class: org.apache.spark.sql.execution.aggregate.SortAggregateExec, name: aggregateExpressions, type: interface scala.collection.Seq)
- object (class org.apache.spark.sql.execution.aggregate.SortAggregateExec, SortAggregate(key=[Date#5], functions=[partial_groupconcatudafz$1(Name#0, __wrapper$1$a9f023af34c0411e98d22696d01fb0a4.__wrapper$1$a9f023af34c0411e98d22696d01fb0a4$$anon$1$GroupConcatUdafZ$1@141c64dc, 0, 0)], output=[Date#5, buff#23])
+- *(2) Sort [Date#5 ASC NULLS FIRST], false, 0
+- *(2) GlobalLimit 3
+- Exchange SinglePartition
+- *(1) LocalLimit 3
+- *(1) FileScan orc default.some_table[Name#0,Date#5] Batched: true, Format: ORC, Location: InMemoryFileIndex[hdfs://....], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Name:string,Date:string>
)
- field (class: org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1, name: $outer, type: class org.apache.spark.sql.execution.aggregate.SortAggregateExec)
- object (class org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1, <function0>)
- field (class: org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1$$anonfun$3, name: $outer, type: class org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1)
- object (class org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1$$anonfun$3, <function2>)
- field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1, name: f$22, type: interface scala.Function2)
- object (class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1, <function0>)
- field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24, name: $outer, type: class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1)
- object (class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24, <function3>)
- field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type: interface scala.Function3)
- object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[20] at save at SparkUtils.scala:165)
- field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD)
- object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@250937f2)
- writeObject data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@7a1da85b)
- writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@250937f2))
- field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq)
- object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[21] at save at SparkUtils.scala:165)
- field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD)
- object (class org.apache.spark.rdd.CoalescedRDD$$anon$1, org.apache.spark.rdd.CoalescedRDD$$anon$1@6ca54bce)
- writeObject data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@30e2a317)
- writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.rdd.CoalescedRDD$$anon$1@6ca54bce))
- field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq)
- object (class org.apache.spark.rdd.CoalescedRDD, CoalescedRDD[22] at save at SparkUtils.scala:165)
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
- object (class scala.Tuple2, (CoalescedRDD[22] at save at SparkUtils.scala:165,<function2>))
16:54:23 [Driver] INFO DAGScheduler {org.apache.spark.internal.Logging$class logInfo} - Job 1 failed: save at SparkUtils.scala:165, took 0.566217 s
16:54:23 [Driver] ERROR FileFormatWriter {org.apache.spark.internal.Logging$class logError} - Aborting job null.
org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: __wrapper$1$a9f023af34c0411e98d22696d01fb0a4.__wrapper$1$a9f023af34c0411e98d22696d01fb0a4$$anon$1
Serialization stack:
- object not serializable (class: __wrapper$1$a9f023af34c0411e98d22696d01fb0a4.__wrapper$1$a9f023af34c0411e98d22696d01fb0a4$$anon$1, value: __wrapper$1$a9f023af34c0411e98d22696d01fb0a4.__wrapper$1$a9f023af34c0411e98d22696d01fb0a4$$anon$1@761332fe)
- field (class: __wrapper$1$a9f023af34c0411e98d22696d01fb0a4.__wrapper$1$a9f023af34c0411e98d22696d01fb0a4$$anon$1$GroupConcatUdafZ$1, name: $outer, type: class __wrapper$1$a9f023af34c0411e98d22696d01fb0a4.__wrapper$1$a9f023af34c0411e98d22696d01fb0a4$$anon$1)
- object (class __wrapper$1$a9f023af34c0411e98d22696d01fb0a4.__wrapper$1$a9f023af34c0411e98d22696d01fb0a4$$anon$1$GroupConcatUdafZ$1, __wrapper$1$a9f023af34c0411e98d22696d01fb0a4.__wrapper$1$a9f023af34c0411e98d22696d01fb0a4$$anon$1$GroupConcatUdafZ$1@141c64dc)
- field (class: org.apache.spark.sql.execution.aggregate.ScalaUDAF, name: udaf, type: class org.apache.spark.sql.expressions.UserDefinedAggregateFunction)
- object (class org.apache.spark.sql.execution.aggregate.ScalaUDAF, GroupConcatUdafZ$1(Name#0))
- field (class: org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression, name: aggregateFunction, type: class org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction)
- object (class org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression, partial_groupconcatudafz$1(Name#0, __wrapper$1$a9f023af34c0411e98d22696d01fb0a4.__wrapper$1$a9f023af34c0411e98d22696d01fb0a4$$anon$1$GroupConcatUdafZ$1@141c64dc, 0, 0))
- writeObject data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@6ea57eb2)
- writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.$colon$colon, List(partial_groupconcatudafz$1(Name#0, __wrapper$1$a9f023af34c0411e98d22696d01fb0a4.__wrapper$1$a9f023af34c0411e98d22696d01fb0a4$$anon$1$GroupConcatUdafZ$1@141c64dc, 0, 0)))
- field (class: org.apache.spark.sql.execution.aggregate.SortAggregateExec, name: aggregateExpressions, type: interface scala.collection.Seq)
- object (class org.apache.spark.sql.execution.aggregate.SortAggregateExec, SortAggregate(key=[Date#5], functions=[partial_groupconcatudafz$1(Name#0, __wrapper$1$a9f023af34c0411e98d22696d01fb0a4.__wrapper$1$a9f023af34c0411e98d22696d01fb0a4$$anon$1$GroupConcatUdafZ$1@141c64dc, 0, 0)], output=[Date#5, buff#23])
+- *(2) Sort [Date#5 ASC NULLS FIRST], false, 0
+- *(2) GlobalLimit 3
+- Exchange SinglePartition
+- *(1) LocalLimit 3
+- *(1) FileScan orc default.some_table[Name#0,Date#5] Batched: true, Format: ORC, Location: InMemoryFileIndex[hdfs://....], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Name:string,Date:string>
)
- field (class: org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1, name: $outer, type: class org.apache.spark.sql.execution.aggregate.SortAggregateExec)
- object (class org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1, <function0>)
- field (class: org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1$$anonfun$3, name: $outer, type: class org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1)
- object (class org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1$$anonfun$3, <function2>)
- field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1, name: f$22, type: interface scala.Function2)
- object (class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1, <function0>)
- field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24, name: $outer, type: class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1)
- object (class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24, <function3>)
- field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type: interface scala.Function3)
- object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[20] at save at SparkUtils.scala:165)
- field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD)
- object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@250937f2)
- writeObject data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@7a1da85b)
- writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@250937f2))
- field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq)
- object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[21] at save at SparkUtils.scala:165)
- field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD)
- object (class org.apache.spark.rdd.CoalescedRDD$$anon$1, org.apache.spark.rdd.CoalescedRDD$$anon$1@6ca54bce)
- writeObject data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@30e2a317)
- writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.rdd.CoalescedRDD$$anon$1@6ca54bce))
- field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq)
- object (class org.apache.spark.rdd.CoalescedRDD, CoalescedRDD[22] at save at SparkUtils.scala:165)
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
- object (class scala.Tuple2, (CoalescedRDD[22] at save at SparkUtils.scala:165,<function2>))
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1043)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:947)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$submitWaitingChildStages$6.apply(DAGScheduler.scala:788)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$submitWaitingChildStages$6.apply(DAGScheduler.scala:787)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at org.apache.spark.scheduler.DAGScheduler.submitWaitingChildStages(DAGScheduler.scala:787)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1296)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1869)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
at com.cccusa.common.utilities.SparkUtils$.saveDfToNFS(SparkUtils.scala:165)
at com.cccusa.common.utilities.SparkUtils$.writeTable(SparkUtils.scala:569)
at com.cccusa.analysis.utilities.AnalysisUtil$.writeTable(AnalysisUtil.scala:461)
at __wrapper$1$a9f023af34c0411e98d22696d01fb0a4.__wrapper$1$a9f023af34c0411e98d22696d01fb0a4$$anon$1.process(<no source file>:73)
at com.cccusa.analysis.ScriptAnalysisProcessor.run(ScriptAnalysisProcessor.scala:161)
at com.cccusa.analysis.Analysis$.main(Analysis.scala:79)
at com.cccusa.analysis.Analysis.main(Analysis.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)
暂无答案!
目前还没有任何答案,快来回答吧!