调用udf函数和get task not serializable异常

bvn4nwqk  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(505)

代码运行良好,直到我做了一些更改,因为我需要实现 rotatekey 作为一个 UDF function 但我错过了一些东西,因为我得到了这个错误

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
  ...
  ...
at playground.RotatingKeys.run(RotatingKeys.scala:25)
at playground.Main$.main(RotatingKeys.scala:37)
at playground.Main.main(RotatingKeys.scala)
Caused by: java.io.NotSerializableException: playground.RotatingKeys
Serialization stack:
- object not serializable (class: playground.RotatingKeys, value: playground.RotatingKeys@e07b4db)

代码如下

import org.apache.logging.log4j.{LogManager, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.expressions.UserDefinedFunction

class RotatingKeys(spark: SparkSession, nRotations: Integer) {
  import spark.implicits._

  val logger: Logger = LogManager.getLogger(getClass)

  logger.info("Initializing KeyRotatorJob")

  def rotateKeyUdf: UserDefinedFunction = {
    udf{ key: String => rotatekey(key, nRotations) }
  }

  def rotatekey(key: String, nRotations: Integer): String =
    key.substring(nRotations) + key.substring(0, nRotations)

  def run(): Unit =
    spark
      .sql("SELECT '0123456' as key")
      .withColumn("rotated_key", rotateKeyUdf($"key"))
      .show()
}

object Main {
  val spark = SparkSession.builder()
    .appName("Run Trials")
    .config("spark.master", "local")
    .getOrCreate()

  def main(args: Array[String]): Unit = {
    val rkRun = new RotatingKeys(spark,4)
    rkRun.run()
  }
}

一切正常

+-------+-----------+
|    key|rotated_key|
+-------+-----------+
|0123456|    4560123|
+-------+-----------+

请帮忙,我会非常感激的。

hyrbngr7

hyrbngr71#

不要直接在udf闭包中使用类的成员(变量/方法)(如果您想直接使用它,那么这个类必须是可序列化的)以列的形式单独发送它-

import org.apache.log4j.LogManager
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.UserDefinedFunction

// SO=63064483
class RotatingKeys(spark: SparkSession, nRotations: Integer) {
  import spark.implicits._

  val logger = LogManager.getLogger(getClass)

  logger.info("Initializing KeyRotatorJob")

  def rotateKeyUdf: UserDefinedFunction = {
    udf{ (key: String, nRotations: Integer) => key.substring(nRotations) + key.substring(0, nRotations) }
  }

  def run(): Unit =
    spark
      .sql("SELECT '0123456' as key")
      .withColumn("rotated_key", rotateKeyUdf($"key", lit(nRotations)))
      .show()
}

object Main {
  val spark = SparkSession.builder()
    .appName("Run Trials")
    .config("spark.master", "local")
    .getOrCreate()

  def main(args: Array[String]): Unit = {
    val rkRun = new RotatingKeys(spark,4)
    rkRun.run()
  }
}

如果你想使用( rotatekey ),使其成为实用程序,并将其移动到对象,如下所示-

import org.apache.log4j.LogManager
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.UserDefinedFunction

// SO=63064483
class RotatingKeys(spark: SparkSession, nRotations: Integer) {
  import spark.implicits._

  val logger = LogManager.getLogger(getClass)

  logger.info("Initializing KeyRotatorJob")

  def run(): Unit =
    spark
      .sql("SELECT '0123456' as key")
      .withColumn("rotated_key", Main.rotateKeyUdf($"key", lit(nRotations)))
      .show()
}

object Main {
  val spark = SparkSession.builder()
    .appName("Run Trials")
    .config("spark.master", "local")
    .getOrCreate()

  def main(args: Array[String]): Unit = {
    val rkRun = new RotatingKeys(spark,4)
    rkRun.run()
  }

  def rotateKeyUdf: UserDefinedFunction = {
    udf{ (key: String, nRotations: Integer) => rotatekey(key, nRotations) }
  }

  def rotatekey(key: String, nRotations: Integer): String =
    key.substring(nRotations) + key.substring(0, nRotations)
}

相关问题