为flink集群中的插件添加自定义依赖项

zpqajqem  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(769)

我有一个flink会话集群(作业管理器+任务管理器),版本1.11.1,配置了log4j-console.properties以包含kafka appender。此外,在作业管理器和任务管理器中,我都启用了flink-s3-fs-hadoop内置插件。
我已经将kafka客户机jar添加到flink/lib目录中,这是容器运行所必需的。但是当s3插件被示例化(并初始化记录器)时,我仍然得到了下面的类加载错误。
原因:org.apache.kafka.common.config.configexception:无效值org.apache.kafka.common.serialization.bytearrayserializer for configuration key.serializer:找不到类org.apache.kafka.common.serialization.bytearrayserializer。
(底部有完整的堆栈跟踪)
据我所知,插件有一个专用的动态类加载,它与系统类加载是分开的。因此,我在flink-conf.yaml文件中添加了以下配置:

classloader.parent-first-patterns.additional: org.apache.kafka
classloader.resolve-order: parent-first

但是错误仍然出现。
虽然怀疑,我没有看到额外的模式被添加到插件类加载器的“allowedflinkpackages”。
我错过了什么?

java.util.ServiceConfigurationError: org.apache.flink.core.fs.FileSystemFactory: Provider org.apache.flink.fs.s3hadoop.S3FileSystemFactory could not be instantiated
        at java.util.ServiceLoader.fail(Unknown Source) ~[?:?]
        at java.util.ServiceLoader$ProviderImpl.newInstance(Unknown Source) ~[?:?]
        at java.util.ServiceLoader$ProviderImpl.get(Unknown Source) ~[?:?]
        at java.util.ServiceLoader$3.next(Unknown Source) ~[?:?]
        at org.apache.flink.core.plugin.PluginLoader$ContextClassLoaderSettingIterator.next(PluginLoader.java:103) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.shaded.guava18.com.google.common.collect.Iterators$5.next(Iterators.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.shaded.guava18.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:48) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.core.fs.FileSystem.addAllFactoriesToList(FileSystem.java:1068) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.core.fs.FileSystem.loadFileSystemFactories(FileSystem.java:1050) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.core.fs.FileSystem.initialize(FileSystem.java:325) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerSecurely(TaskManagerRunner.java:315) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:297) [flink-dist_2.11-1.11.1.jar:1.11.1]
Caused by: java.lang.ExceptionInInitializerError
        at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]
        at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
        at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
        at java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]
        ... 11 more
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.ByteArraySerializer for configuration key.serializer: Class org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
        at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:728) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:467) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:481) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298) ~[kafka-clients-2.5.0.jar:?]
        at org.apache.logging.log4j.core.appender.mom.kafka.DefaultKafkaProducerFactory.newKafkaProducer(DefaultKafkaProducerFactory.java:40) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.appender.mom.kafka.KafkaManager.startup(KafkaManager.java:136) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.appender.mom.kafka.KafkaAppender.start(KafkaAppender.java:164) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.config.AbstractConfiguration.start(AbstractConfiguration.java:304) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.LoggerContext.setConfiguration(LoggerContext.java:579) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:651) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:668) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.LoggerContext.start(LoggerContext.java:253) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:153) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:45) ~[log4j-core-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.LogManager.getContext(LogManager.java:194) ~[log4j-api-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.spi.AbstractLoggerAdapter.getContext(AbstractLoggerAdapter.java:138) ~[log4j-api-2.12.1.jar:2.12.1]
        at org.apache.logging.slf4j.Log4jLoggerFactory.getContext(Log4jLoggerFactory.java:45) ~[log4j-slf4j-impl-2.12.1.jar:2.12.1]
        at org.apache.logging.log4j.spi.AbstractLoggerAdapter.getLogger(AbstractLoggerAdapter.java:48) ~[log4j-api-2.12.1.jar:2.12.1]
        at org.apache.logging.slf4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:30) ~[log4j-slf4j-impl-2.12.1.jar:2.12.1]
        at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:329) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:349) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.<clinit>(AbstractS3FileSystemFactory.java:88) ~[?:?]
        at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]
        at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
        at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
        at java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]
        ... 11 more
[2020-12-06T09:15:45,892][Error] {} [o.a.f.c.f.FileSystem]: Failed to load a file system via services
java.util.ServiceConfigurationError: org.apache.flink.core.fs.FileSystemFactory: Provider org.apache.flink.fs.s3hadoop.S3AFileSystemFactory could not be instantiated
        at java.util.ServiceLoader.fail(Unknown Source) ~[?:?]
        at java.util.ServiceLoader$ProviderImpl.newInstance(Unknown Source) ~[?:?]
        at java.util.ServiceLoader$ProviderImpl.get(Unknown Source) ~[?:?]
        at java.util.ServiceLoader$3.next(Unknown Source) ~[?:?]
        at org.apache.flink.core.plugin.PluginLoader$ContextClassLoaderSettingIterator.next(PluginLoader.java:103) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.shaded.guava18.com.google.common.collect.Iterators$5.next(Iterators.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.shaded.guava18.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:48) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.core.fs.FileSystem.addAllFactoriesToList(FileSystem.java:1068) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.core.fs.FileSystem.loadFileSystemFactories(FileSystem.java:1050) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.core.fs.FileSystem.initialize(FileSystem.java:325) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerSecurely(TaskManagerRunner.java:315) [flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:297) [flink-dist_2.11-1.11.1.jar:1.11.1]
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.fs.s3hadoop.S3FileSystemFactory
        at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:?]
        at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
        at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
        at java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]
        ... 11 more
i7uq4tfw

i7uq4tfw1#

正如您所说的,flink插件是通过自己的类加载器加载的,并且与任何其他插件完全隔离。
如果我们深入研究源代码,那么在集群启动时会用到另一个键(不幸的是,它没有在任何地方记录下来):

plugin.classloader.parent-first-patterns.additional

这允许您使用pluginclassloader将外部jar添加到类路径中
声明+用法:https://github.com/apache/flink/blob/53a4b4407816c2780fed2f8995affbebc1f58c3c/flink-core/src/main/java/org/apache/flink/configuration/coreoptions.java#l156-174英里
在flink-conf.yaml中添加以下内容。

plugin.classloader.parent-first-patterns.additional: org.apache.kafka

那应该可以了

相关问题