代码运行良好,直到我做了一些更改,因为我需要实现 rotatekey
作为一个 UDF function
但我错过了一些东西,因为我得到了这个错误
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
...
...
at playground.RotatingKeys.run(RotatingKeys.scala:25)
at playground.Main$.main(RotatingKeys.scala:37)
at playground.Main.main(RotatingKeys.scala)
Caused by: java.io.NotSerializableException: playground.RotatingKeys
Serialization stack:
- object not serializable (class: playground.RotatingKeys, value: playground.RotatingKeys@e07b4db)
代码如下
import org.apache.logging.log4j.{LogManager, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.expressions.UserDefinedFunction
class RotatingKeys(spark: SparkSession, nRotations: Integer) {
import spark.implicits._
val logger: Logger = LogManager.getLogger(getClass)
logger.info("Initializing KeyRotatorJob")
def rotateKeyUdf: UserDefinedFunction = {
udf{ key: String => rotatekey(key, nRotations) }
}
def rotatekey(key: String, nRotations: Integer): String =
key.substring(nRotations) + key.substring(0, nRotations)
def run(): Unit =
spark
.sql("SELECT '0123456' as key")
.withColumn("rotated_key", rotateKeyUdf($"key"))
.show()
}
object Main {
val spark = SparkSession.builder()
.appName("Run Trials")
.config("spark.master", "local")
.getOrCreate()
def main(args: Array[String]): Unit = {
val rkRun = new RotatingKeys(spark,4)
rkRun.run()
}
}
一切正常
+-------+-----------+
| key|rotated_key|
+-------+-----------+
|0123456| 4560123|
+-------+-----------+
请帮忙,我会非常感激的。
1条答案
按热度按时间hyrbngr71#
不要直接在udf闭包中使用类的成员(变量/方法)(如果您想直接使用它,那么这个类必须是可序列化的)以列的形式单独发送它-
如果你想使用(
rotatekey
),使其成为实用程序,并将其移动到对象,如下所示-