返回运行时在Spark UDF中定义了模式的行

2sbarzqh  于 2023-02-05  发布在  Apache
关注(0)|答案(1)|浏览(117)

我的剑已经磨钝了,如果有人能帮上忙,我将不胜感激!

背景

我正在构建一个ETL管道,它从Kafka队列中取出GNMI Protobuf更新消息,并最终根据值路径的前缀和参数(例如DataBricks运行时)将它们分解为一堆增量表。
在不深入讨论细节的情况下,每个前缀大致对应于一个表的模式,但需要注意的是,上游的路径可能会改变(通常是新的子树),因此模式不是固定的,这类似于嵌套的JSON结构。
我首先通过前缀来分解更新,这样所有的更新都有大致相同的模式,我定义了一些转换,这样当模式不完全匹配时,我可以将它们强制到一个公共模式中。
当我试图用通用模式创建一个结构列时遇到了麻烦。

尝试1

我首先尝试从UDF返回一个Array[Any],并在UDF定义中提供一个schema(我知道这已经过时了):

import org.apache.spark.sql.{functions => F, Row, types => T}

  def mapToRow(deserialized: Map[String, ParsedValueV2]): Array[Any] = {
    def getValue(key: String): Any = {
        deserialized.get(key) match {
            case Some(value) => value.asType(columns(key))
            case None => None
        }
    }
    
    columns.keys.toArray.map(getValue).toArray
  }
  
  spark.conf.set("spark.sql.legacy.allowUntypedScalaUDF", "true")
  def mapToStructUdf = F.udf(mapToRow _, account.sparkSchemas(prefix))

这段代码创建了一个Array对象,其中包含我需要的类型值。不幸的是,当我尝试使用UDF时,我得到了这个错误:

java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to $line8760b7c10da04d2489451bb90ca42c6535.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$ParsedValueV2

我不确定什么是不匹配的,但是我注意到值的类型是Java类型,而不是scala,所以也许这是相关的?

尝试2

也许我可以使用类型化的UDF接口,我可以在运行时为每个模式创建一个case类,然后用它作为UDF的返回值吗?
我试着用我找到的各种东西来实现这个功能,比如:

import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox
val tb = universe.runtimeMirror(getClass.getClassLoader).mkToolBox()
val test = tb.eval(tb.parse("object Test; Test"))

但我甚至无法获得test的示例,也不知道如何将其用作UDF的返回值。我假设我需要使用泛型类型,但我的scala-fu太弱,无法解决这个问题。
∮最后的问题是
有人能帮我弄清楚采取哪种方法,以及如何继续这种方法吗?
提前感谢您的帮助!!!

更新-这是一个Spark错误吗?

我把这个问题提炼成了这段代码:

import org.apache.spark.sql.{functions => F, Row, types => T}

// thanks @Dmytro Mitin
val spark = SparkSession.builder
  .master ("local")
  .appName ("Spark app")
  .getOrCreate ()

spark.conf.set("spark.sql.legacy.allowUntypedScalaUDF", "true")

def simpleFn(foo: Any): Seq[Any] = List("hello world", "Another String", 42L)
// def simpleFn(foo: Any): Seq[Any] = List("hello world", "Another String")

def simpleUdf = F.udf(
  simpleFn(_),
  dataType = T.StructType(
    List(
      T.StructField("a_string", T.StringType),
      T.StructField("another_string", T.StringType),
      T.StructField("an_int", T.IntegerType),
    )
  )
)

Seq(("bar", "foo"))
  .toDF("column", "input")
  .withColumn(
    "array_data",
    simpleUdf($"input")
  )
  .show(truncate=false)

这将导致此错误消息

IllegalArgumentException: The value (List(Another String, 42)) of the type (scala.collection.immutable.$colon$colon) cannot be converted to the string type

嗯......真奇怪。那个列表是从哪里来的,缺少了行的第一个元素?
两个值的版本(例如"hello world","Another String")也有同样的问题,但如果我的结构体中只有一个值,那么它就很好:

// def simpleFn(foo: Any): Seq[Any] = List("hello world", "Another String")
def simpleFn(foo: Any): Seq[Any] = List("hello world")

def simpleUdf = F.udf(
  simpleFn(_),
  dataType = T.StructType(
    List(
      T.StructField("a_string", T.StringType),
      // T.StructField("another_string", T.StringType),
      // T.StructField("an_int", T.IntegerType),
    )
  )
)

我的查询得到了

+------+-----+-------------+
|column|input|array_data   |
+------+-----+-------------+
|bar   |foo  |{hello world}|
+------+-----+-------------+

看起来它给了我序列的第一个元素作为结构体的第一个字段,它的其余部分作为结构体的第二个元素,然后第三个元素为null(在其他情况下可见),并导致异常。
在我看来这是一个bug。还有谁有过像这样动态构建模式的UDF的经验吗?
Spark 3.3.1,scala 2.12,分布式批处理器12.0
∮反思挣扎∮
要实现我想做的事情,一个愚蠢的方法是获取我推断出的模式,生成一堆scala代码来实现case类,我可以将其用作UDF的返回类型,然后编译代码,打包JAR,将其加载到我的databricks运行时,然后使用case类作为UDF的返回结果。
这看起来是一种非常复杂的方法,如果我能生成case类,然后做一些类似这样的事情就太好了

def myUdf[CaseClass](input: SomeInputType): CaseClass = 
    CaseClass(input.giveMeResults: _*)

问题是我不知道如何将我使用eval创建的类型放入当前的"上下文"(我不知道这里用什么词)。
此代码:

import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox
val tb = universe.runtimeMirror(getClass.getClassLoader).mkToolBox()
val test = tb.eval(tb.parse("object Test; Test"))

给我这个:

...
test: Any = __wrapper$1$bb89c0cde37c48929fa9d8cdabeeb0f8.__wrapper$1$bb89c0cde37c48929fa9d8cdabeeb0f8$Test$1$@492531c0

我认为test是Test的一个示例,但是REPL中的类型系统不知道任何名为Test的类型,所以我不能使用test.asInstanceOf[Test]或类似的东西
我知道这是一个经常被问到的问题,但是我似乎在任何地方都找不到关于如何真正完成我上面描述的事情的答案。

aiazj4mn

aiazj4mn1#

关于“反思斗争”,我不清楚是否:1)你已经有了def myUdf[T] = ...,你只是想调用它来生成case类:myUdf[GeneratedClass]或2)您尝试基于生成的类定义def myUdf[T] = ...
1.在前一种情况下,应用途:

  • tb.define生成一个对象(或case类),它返回一个类符号(或模块符号),您可以进一步使用它(例如在类型位置)
  • tb.eval来调用方法(myUdf
object Main extends App {
  def myUdf[T](): Unit = println("myUdf")

  import scala.reflect.runtime.universe
  import universe.Quasiquote
  import scala.tools.reflect.ToolBox

  val tb = universe.runtimeMirror(getClass.getClassLoader).mkToolBox()
  val testSymbol = tb.define(q"object Test")
  val test = tb.eval(q"$testSymbol")

  tb.eval(q"Main.myUdf[$testSymbol]()") // myUdf
}

在这个例子中,我修改了myUdf的签名(和主体),您应该使用您的实际签名。
1.在后一种情况下,您也可以在运行时定义myUdf

object Main extends App {
  import scala.reflect.runtime.universe
  import universe.Quasiquote
  import scala.tools.reflect.ToolBox

  val tb = universe.runtimeMirror(getClass.getClassLoader).mkToolBox()
  val testSymbol = tb.define(q"object Test")
  val test = tb.eval(q"$testSymbol")

  val xSymbol = tb.define(
    q"""
      object X {
        def myUdf[T](): Unit = println("myUdf")
      }
    """
  )

  tb.eval(q"$xSymbol.myUdf[$testSymbol]()") //myUdf
}

你应该试着为普通情况编写myUdf,我们将把它翻译成运行时生成的。
所以我不能使用test.asInstanceOf[Test]或类似命令
是的,类型Test在编译时不存在,所以你不能这样使用它,它在运行时存在,所以你应该在准引号q"..."(或tb.parse("..."))中使用它。

object Main extends App {
  import scala.reflect.runtime.universe
  import universe.Quasiquote
  import scala.tools.reflect.ToolBox

  val tb = universe.runtimeMirror(getClass.getClassLoader).mkToolBox()
  val testSymbol = tb.define(q"object Test")
  val test = tb.eval(q"$testSymbol")

  tb.eval(q"Main.test.asInstanceOf[${testSymbol.asModule.moduleClass.asClass.toType}]") // no exception, so test is an instance of Test
  tb.eval(q"Main.test.asInstanceOf[$testSymbol.type]") // no exception, so test is an instance of Test
  println(
    tb.eval(q"Main.test.getClass").asInstanceOf[Class[_]]
  ) // class __wrapper$1$0bbb246b633b472e8df54efc3e9ff9d9.Test$
  println(
    tb.eval(q"scala.reflect.runtime.universe.typeOf[$testSymbol.type]").asInstanceOf[universe.Type]
  ) // __wrapper$1$0bbb246b633b472e8df54efc3e9ff9d9.Test.type
}

关于ClassCastExceptionIllegalArgumentException,我注意到如果您更改UDF返回类型,异常就会消失

def simpleUdf = F.udf (
  simpleFn (_),
  dataType = T.StructType (
    List (
      T.StructField ("a_string", T.StringType),
      T.StructField ("tail1", T.StructType (
        List (
          T.StructField ("another_string", T.StringType),
          T.StructField ("tail2", T.StructType (
            List (
              T.StructField ("an_int", T.IntegerType),
            )
          )),
        )
      )),
    )
  )
)

//+------+-----+-------------------------------------+
//|column|input|array_data                           |
//+------+-----+-------------------------------------+
//|bar   |foo  |{hello world, {Another String, {42}}}|
//+------+-----+-------------------------------------+

我想这是有道理的,因为List::(又名$colon$colon)的头部和尾部,然后尾部是::的头部和尾部等。

相关问题