我有一个spring启动的java应用程序myapp.jar,它有一些udf函数。
sparkconfuration.java文件
public SparkSession sparkSession() {
SparkSession session = SparkSession
.builder()
.appName("uniloader-spark-master")
.master(sparkMaster)
.config("spark.jars", sparkJars)
.getOrCreate();
log.info("Spark jars for control: " + sparkJars);
session.sqlContext().udf().register("to_integer", new ToIntegerUdf(), DataTypes.IntegerType);
return session;
}
tointegerudf.java文件
public class ToIntegerUdf implements UDF2<Object, Object, Integer> {
private static final long serialVersionUID = 165436543635L;
private static final Logger log = LoggerFactory.getLogger(BaseFilesUtils.class);
@Override
public Integer call(Object valueObj, Object delimiterObj) {
try {
log.info("value = " + valueObj);
log.info("delimiter = " + delimiterObj);
if (valueObj == null || delimiterObj == null) {
return null;
}
String value = String.valueOf(valueObj);
String delimiter = (String) delimiterObj;
String doubleAsString = value.replaceAll(Pattern.quote(delimiter), ".");
return new BigDecimal(doubleAsString).setScale(0, RoundingMode.HALF_UP).intValue();
} catch (Exception e) {
log.error("Error while to_integer transformation for " + valueObj + ". Details:", e);
return null;
}
}
}
sparkjars包含myjar.jar的路径。
用maven构建应用程序。spark库版本是3.02,scala版本是2.12.10。
在spark standalone 3.0.2上运行应用程序时出现错误:
java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2350)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2032)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2032)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2032)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2032)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2235)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:85)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
在spark worker日志中,我看到worker fetch myjar: 21/03/23 19:33:24 INFO Executor: Fetching spark://demo.phoenixit.ru:39597/jars/myJar.jar with timestamp 1616517199949
我认为这个问题要么依赖,要么提交,但我不知道如何解决它。
1条答案
按热度按时间7kqas0il1#
我已经修好了。
问题出在springbootmaven插件上。spark standalone无法获取有关自定义项的信息,如果我使用此插件。
当我用maven shade插件替换插件时,一切正常