如何处理Scala UDF中的空值?

avwztpqn  于 2022-11-09  发布在  Scala
关注(0)|答案(1)|浏览(172)

我知道有很多与我的问题相关的答案,但由于我对Scala非常陌生,我无法理解这些答案。如果有人能帮我改正我的UDF,我将不胜感激。
我有这个UDF,它用于执行从GMT到MST的时区转换:

val Gmt2Mst = (dtm_str: String, inFmt: String, outFmt: String) => {     
         if ("".equals(dtm_str) || dtm_str == null || dtm_str.length() < inFmt.length())  {
           null 

         }
         else {
           val gmtZoneId = ZoneId.of("GMT", ZoneId.SHORT_IDS);
           val mstZoneId = ZoneId.of("MST", ZoneId.SHORT_IDS);

           val inFormatter = DateTimeFormatter.ofPattern(inFmt);
           val outFormatter = DateTimeFormatter.ofPattern(outFmt);
           val dateTime = LocalDateTime.parse(dtm_str, inFormatter);
           val gmt = ZonedDateTime.of(dateTime, gmtZoneId)
           val mst = gmt.withZoneSameInstant(mstZoneId)
           mst.format(outFormatter)
         }
     }

spark.udf.register("Gmt2Mst", Gmt2Mst)

但每当遇到NULL时,它都无法处理。我正在尝试使用dtm_str == null处理它,但仍然失败。能不能请一些人帮我做些什么修正,而不是dtm_str == null,这样才能帮助我实现目标?
举个例子,如果我运行下面的spark-SQL:

spark.sql("select null as col1, Gmt2Mst(null,'yyyyMMddHHmm', 'yyyyMMddHHmm') as col2").show()

我收到以下错误:

22/09/20 14:10:31 INFO TaskSetManager: Starting task 101.1 in stage 27.0 (TID 1809) (10.243.37.204, executor 18, partition 101, PROCESS_LOCAL, 5648 bytes) taskResourceAssignments Map()
22/09/20 14:10:31 INFO TaskSetManager: Lost task 100.0 in stage 27.0 (TID 1801) on 10.243.37.204, executor 18: org.apache.spark.SparkException (Failed to execute user defined function (anonfun$4: (string, string) => string)) [duplicate 1]
22/09/20 14:10:31 INFO TaskSetManager: Starting task 100.1 in stage 27.0 (TID 1810) (10.243.37.241, executor 1, partition 100, PROCESS_LOCAL, 5648 bytes) taskResourceAssignments Map()
22/09/20 14:10:31 INFO TaskSetManager: Lost task 102.0 in stage 27.0 (TID 1803) on 10.243.37.241, executor 1: org.apache.spark.SparkException (Failed to execute user defined function (anonfun$4: (string, string) => string)) [duplicate 2]
22/09/20 14:10:31 INFO TaskSetManager: Starting task 102.1 in stage 27.0 (TID 1811) (10.243.36.183, executor 22, partition 102, PROCESS_LOCAL, 5648 bytes) taskResourceAssignments Map()
22/09/20 14:10:31 INFO TaskSetManager: Finished task 80.0 in stage 27.0 (TID 1781) in 2301 ms on 10.243.36.183 (executor 22) (81/355)
22/09/20 14:10:31 INFO TaskSetManager: Starting task 108.0 in stage 27.0 (TID 1812) (10.243.36.156, executor 4, partition 108, PROCESS_LOCAL, 5648 bytes) taskResourceAssignments Map()
22/09/20 14:10:31 INFO TaskSetManager: Lost task 103.0 in stage 27.0 (TID 1804) on 10.243.36.156, executor 4: org.apache.spark.SparkException (Failed to execute user defined function (anonfun$4: (string, string) => string)) [duplicate 3]
22/09/20 14:10:31 INFO TaskSetManager: Starting task 103.1 in stage 27.0 (TID 1813) (10.243.36.180, executor 9, partition 103, PROCESS_LOCAL, 5648 bytes) taskResourceAssignments Map()
22/09/20 14:10:31 WARN TaskSetManager: Lost task 105.0 in stage 27.0 (TID 1806) (10.243.36.180 executor 9): org.apache.spark.SparkException: Failed to execute user defined function (anonfun$3: (string, string) => string)
        at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:136)
        at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage29.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NullPointerException
        at org.commonspirit.sepsis_bio.recovery.SepsisRecoveryBundle$$anonfun$3.apply(SepsisRecoveryBundle.scala:123)
        at org.commonspirit.sepsis_bio.recovery.SepsisRecoveryBundle$$anonfun$3.apply(SepsisRecoveryBundle.scala:122)
        ... 15 more
8ftvxx2r

8ftvxx2r1#

我做了以下测试,它似乎起作用了:
创建一个类型为空的 Dataframe 。架构将为:

root
  |-- v0: string (nullable = true)
  |-- v1: string (nullable = true)
  |-- null: null (nullable = true)

例如:

+----+-----+----+
|  v0|   v1|null|
+----+-----+----+
|hola|adios|null|
+----+-----+----+

创建UDF:

val udf1 = udf{ v1: Any => { if(v1 != null) s"${v1}_transformed" else null } }

请注意,在Scala中使用Any是一种糟糕的做法,但这是Spark SQL,要处理可能是两种不同类型的值,您需要使用这个超类型。
注册UDF:

spark.udf.register("udf1", udf1)

创建视图:

df2.createTempView("df2")

将UDF应用于视图:

spark.sql("select udf1(null) from df").show()

它显示:

+---------+
|UDF(null)|
+---------+
|     null|
+---------+

应用于具有非空值的列:

spark.sql("select udf1(v0) from df2").show()

它显示:

+----------------+
|         UDF(v0)|
+----------------+
|hola_transformed|
+----------------+

相关问题