kafka连接器和远程执行器(apache flink)的类加载器问题

myzjeezk  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(614)

当尝试在远程集群上执行flink作业时,我总是得到相同的classloader异常。

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:207)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:223)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
    at java.lang.Thread.run(Thread.java:745)
10/07/2016 06:34:57 Job execution switched to status FAILING.

我正在使用maven shade插件打包我的jar,这个类与jar一起打包,当我查看jar内部时可以看到这一点。

$:jar -tf MyJar.jar |grep "org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer"
    org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.class
    org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.class

这是我的pom.xml

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.1.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.10</artifactId>
            <version>1.1.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.10</artifactId>
            <version>1.1.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
            <version>1.1.2</version>
        </dependency>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>1.5</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <finalName>FloorData</finalName>
                        <shadeTestJar>false</shadeTestJar>
                        <shadedArtifactAttached>false</shadedArtifactAttached>
                        <createDependencyReducedPom>false</createDependencyReducedPom>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                <resource>reference.conf</resource>
                            </transformer>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>floordata.cli.launcher.FlinkLauncher</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>

我在我的代码中配置我的远程环境如下。我不确定这是否正确,但我是否需要指定kafka连接器的位置作为createremoteenvironment的参数??

InetSocketAddress address = parseHostPortAddress((String) configuration.get(FLINK_JOB_MANAGER_ADDRESS));
            StreamExecutionEnvironment.createRemoteEnvironment(address.getHostName(), address.getPort());

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题