我知道有很多与我的问题相关的答案,但由于我对Scala非常陌生,我无法理解这些答案。如果有人能帮我改正我的UDF,我将不胜感激。
我有这个UDF,它用于执行从GMT到MST的时区转换:
val Gmt2Mst = (dtm_str: String, inFmt: String, outFmt: String) => {
if ("".equals(dtm_str) || dtm_str == null || dtm_str.length() < inFmt.length()) {
null
}
else {
val gmtZoneId = ZoneId.of("GMT", ZoneId.SHORT_IDS);
val mstZoneId = ZoneId.of("MST", ZoneId.SHORT_IDS);
val inFormatter = DateTimeFormatter.ofPattern(inFmt);
val outFormatter = DateTimeFormatter.ofPattern(outFmt);
val dateTime = LocalDateTime.parse(dtm_str, inFormatter);
val gmt = ZonedDateTime.of(dateTime, gmtZoneId)
val mst = gmt.withZoneSameInstant(mstZoneId)
mst.format(outFormatter)
}
}
spark.udf.register("Gmt2Mst", Gmt2Mst)
但每当遇到NULL
时,它都无法处理。我正在尝试使用dtm_str == null
处理它,但仍然失败。能不能请一些人帮我做些什么修正,而不是dtm_str == null
,这样才能帮助我实现目标?
举个例子,如果我运行下面的spark-SQL:
spark.sql("select null as col1, Gmt2Mst(null,'yyyyMMddHHmm', 'yyyyMMddHHmm') as col2").show()
我收到以下错误:
22/09/20 14:10:31 INFO TaskSetManager: Starting task 101.1 in stage 27.0 (TID 1809) (10.243.37.204, executor 18, partition 101, PROCESS_LOCAL, 5648 bytes) taskResourceAssignments Map()
22/09/20 14:10:31 INFO TaskSetManager: Lost task 100.0 in stage 27.0 (TID 1801) on 10.243.37.204, executor 18: org.apache.spark.SparkException (Failed to execute user defined function (anonfun$4: (string, string) => string)) [duplicate 1]
22/09/20 14:10:31 INFO TaskSetManager: Starting task 100.1 in stage 27.0 (TID 1810) (10.243.37.241, executor 1, partition 100, PROCESS_LOCAL, 5648 bytes) taskResourceAssignments Map()
22/09/20 14:10:31 INFO TaskSetManager: Lost task 102.0 in stage 27.0 (TID 1803) on 10.243.37.241, executor 1: org.apache.spark.SparkException (Failed to execute user defined function (anonfun$4: (string, string) => string)) [duplicate 2]
22/09/20 14:10:31 INFO TaskSetManager: Starting task 102.1 in stage 27.0 (TID 1811) (10.243.36.183, executor 22, partition 102, PROCESS_LOCAL, 5648 bytes) taskResourceAssignments Map()
22/09/20 14:10:31 INFO TaskSetManager: Finished task 80.0 in stage 27.0 (TID 1781) in 2301 ms on 10.243.36.183 (executor 22) (81/355)
22/09/20 14:10:31 INFO TaskSetManager: Starting task 108.0 in stage 27.0 (TID 1812) (10.243.36.156, executor 4, partition 108, PROCESS_LOCAL, 5648 bytes) taskResourceAssignments Map()
22/09/20 14:10:31 INFO TaskSetManager: Lost task 103.0 in stage 27.0 (TID 1804) on 10.243.36.156, executor 4: org.apache.spark.SparkException (Failed to execute user defined function (anonfun$4: (string, string) => string)) [duplicate 3]
22/09/20 14:10:31 INFO TaskSetManager: Starting task 103.1 in stage 27.0 (TID 1813) (10.243.36.180, executor 9, partition 103, PROCESS_LOCAL, 5648 bytes) taskResourceAssignments Map()
22/09/20 14:10:31 WARN TaskSetManager: Lost task 105.0 in stage 27.0 (TID 1806) (10.243.36.180 executor 9): org.apache.spark.SparkException: Failed to execute user defined function (anonfun$3: (string, string) => string)
at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:136)
at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage29.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NullPointerException
at org.commonspirit.sepsis_bio.recovery.SepsisRecoveryBundle$$anonfun$3.apply(SepsisRecoveryBundle.scala:123)
at org.commonspirit.sepsis_bio.recovery.SepsisRecoveryBundle$$anonfun$3.apply(SepsisRecoveryBundle.scala:122)
... 15 more
1条答案
按热度按时间8ftvxx2r1#
我做了以下测试,它似乎起作用了:
创建一个类型为空的 Dataframe 。架构将为:
例如:
创建UDF:
请注意,在Scala中使用Any是一种糟糕的做法,但这是Spark SQL,要处理可能是两种不同类型的值,您需要使用这个超类型。
注册UDF:
创建视图:
将UDF应用于视图:
它显示:
应用于具有非空值的列:
它显示: