flink在检查点期间找不到groovy类

hxzsmxv2  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(453)

我在Flink有个问题。我的实时计算引擎使用groovy脚本扩展计算类型(like:sum、平均、计数等)。我们定义了一个标准的计算接口(abstractcompute),如果我想在这个框架中扩展一个计算类型,我只需要impl abstractcompute.com,然后将groovy脚本存储在db中。然后应用程序可以按任务读取脚本,并通过groovyclassloader加载到jvm中。这个过程不再使用flink,这取决于工作是否很好。原因是flink使用另一个类加载器(flinkusercodeclassloaders$childfirstclassloader)在检查点加载groovy脚本示例化的对象,而不是使用groovyclassloader。
代码

// Init Groovy ClassLoader
CompilerConfiguration classLoaderConfig = new CompilerConfiguration();
classLoaderConfig.setSourceEncoding("UTF-8");
CLASS_LOADER = new GroovyClassLoader(Thread.currentThread().getContextClassLoader(), classLoaderConfig);

......
......
// parse script and  new instance and put into cache
Class clazz = CLASS_LOADER.parseClass(computeType.getScript());
AbstractComputable computableObject = (AbstractComputable) clazz.newInstance();

removeComputeType(computeType);
// 自定义计算方式对象存入缓存
IndicatorCache.COMPUTABLE_OBJECT_CACHE.put(computeType.getId().intValue(), computableObject);

......
......

AbstractComputable computable = IndicatorCache.COMPUTABLE_OBJECT_CACHE.get(indicator.getComputeType());
if (computable == null) {
   if (log.isDebugEnabled()) {
      log.debug("without computeType:{} in cache", indicator.getComputeType());
   }
   return false;
}
indicator.setComputableObject(computable);


异常堆栈:

com.esotericsoftware.kryo.KryoException: Unable to find class: com.xxx.xxx.common.computable.CurValueCompute
Serialization trace:
computableObject (com.xxx.xxx.common.pojo.property.IndicatorProperty)
normalIndicatorList (com.xxx.xxx.common.pojo.property.ComputeTuple)
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
        at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
        at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
        at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
        at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.xxx.xxx.common.computable.CurValueCompute
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
        ... 41 common frames omitted

如何在flink中正确使用groovy动态语言

qojgxg4l

qojgxg4l1#

flink需要知道它正在处理的类型。否则,无法序列化和反序列化示例。因此,类定义需要包含在提交给flink集群的用户代码jar中。
如果您想支持动态加载的类,那么应该将这些示例序列化为通用格式(例如。 AbstractComputeContainer )在用户代码函数中可以完全解析 GroovyClassLoader .

相关问题