scala 在spark-shell中使用全局对象时隐式瓦尔序列化

wljmcqd8  于 2023-03-02  发布在  Scala
关注(0)|答案(2)|浏览(134)

我不清楚为什么(不可序列化的)隐式瓦尔在这里被序列化(抛出异常):

implicit val sc2:SparkContext = sc
val s1 = "asdf"
sc.parallelize(Array(1,2,3)).map(x1 => s1.map(x => 4))

但当s1的值在闭包的作用域中时则不会:

implicit val sc2:SparkContext = sc
sc.parallelize(Array(1,2,3)).map(x1 => "asdf".map(x => 4))

我的用例显然更加复杂,但我将其归结为这个问题。
(The解决方案是将隐式瓦尔定义为@transient)

holgip5t

holgip5t1#

这取决于这些行所在的范围

让我们看看三个选项--在 * 方法 * 中、在没有s1的 * 类 * 中以及在有s1的 * 类 * 中:

object TTT {

  val sc = new SparkContext("local", "test")

  def main(args: Array[String]): Unit = {
    new A().foo()  // works
    new B          // works
    new C          // fails
  }

  class A {
    def foo(): Unit = {
      // no problem here: vars in a method can be serialized on their own
      implicit val sc2: SparkContext = sc
      val s1 = "asdf"
      sc.parallelize(Array(1, 2, 3)).map(x1 => s1.map(x => 4)).count()
      println("in A - works!")
    }
  }

  class B {
    // no problem here: B isn't serialized at all because there are no references to its members
    implicit val sc2: SparkContext = sc
    sc.parallelize(Array(1, 2, 3)).map(x1 => "asdf".map(x => 4)).count()
    println("in B - works!")
  }

  class C extends Serializable {
    implicit val sc2: SparkContext = sc
    val s1 = "asdf" // to serialize s1, Spark will try serializing the YYY instance, which will serialize sc2
    sc.parallelize(Array(1, 2, 3)).map(x1 => s1.map(x => 4)).count() // fails
  }

}

底线是,无论是否隐式,当且仅当s1sc2是一个类的成员时,这将失败,这意味着类必须被序列化,并将它们一起“拖动”。

xeufq47z

xeufq47z2#

作用域是spark-shell REPL。在这种情况下,sc2(以及顶层REPL作用域中定义的任何其他隐式值)只有在它是隐式的并且是RDD操作中使用的该作用域中的另一个值时才被序列化。这是有意义的,因为隐式值需要全局可用,因此会自动序列化到所有工作节点。

相关问题