java.lang.classcastexception:netty在flink上执行jar失败

kpbwa7wx  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(604)

我有一个flink jar,它在elastic和cassandra上接收可序列化数据类型的数据流,其beahaviour与独立上下文不同。我读过关于netty与flink进程的冲突,我从pom文件中排除了它,但它随后会包含在包中
有什么建议吗?
例外情况如下:

java.lang.ClassCastException: io.netty.channel.epoll.EpollEventLoopGroup cannot be cast to io.netty.channel.EventLoopGroup
    at com.datastax.driver.core.NettyUtil.newEventLoopGroupInstance(NettyUtil.java:134)
    at com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:99)
    at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:774)
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1446)
    at com.datastax.driver.core.Cluster.init(Cluster.java:159)
    at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:330)
    at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:305)
    at com.datastax.driver.core.Cluster.connect(Cluster.java:247)
    at it.almaviva.wtf.mms.integratemobilitystatusevent.repository.cassandra.ScheduledEventRepositoryImpl.findProgrammed(ScheduledEventRepositoryImpl.java:51)
    at it.almaviva.wtf.mms.integratemobilitystatusevent.transformation.ObservedEventDelayProcessFunction.loadScheduledEvents(ObservedEventDelayProcessFunction.java:368)
    at it.almaviva.wtf.mms.integratemobilitystatusevent.transformation.ObservedEventDelayProcessFunction.processElement(ObservedEventDelayProcessFunction.java:64)
    at it.almaviva.wtf.mms.integratemobilitystatusevent.transformation.ObservedEventDelayProcessFunction.processElement(ObservedEventDelayProcessFunction.java:1)
    at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:94)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
    at java.lang.Thread.run(Thread.java:748)
    Suppressed: java.lang.NullPointerException
        at com.datastax.driver.core.Cluster$Manager.close(Cluster.java:1676)
        at com.datastax.driver.core.Cluster$Manager.access$200(Cluster.java:1354)
        at com.datastax.driver.core.Cluster.closeAsync(Cluster.java:566)
        at com.datastax.driver.core.Cluster.close(Cluster.java:578)
        at it.almaviva.wtf.mms.integratemobilitystatusevent.repository.cassandra.ScheduledEventRepositoryImpl.findProgrammed(ScheduledEventRepositoryImpl.java:68)

这是pom文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>it.almaviva.wtf.mms</groupId>
    <artifactId>mms-integretedmobilitystatusevent-flink-process</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <name>Passing Event Process</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.4.0</flink.version>
        <slf4j.version>1.7.7</slf4j.version>
        <log4j.version>1.2.17</log4j.version>
        <scala.binary.version>2.11</scala.binary.version>
        <junit.version>4.12</junit.version>
        <cassandra.version>3.2.0</cassandra.version>
        <flink.connector.elastic.version>1.3.1</flink.connector.elastic.version>
        <thyco.compiler.version>0.21.0</thyco.compiler.version>
        <jackson.version>2.9.4</jackson.version>

        <cassandra.unit.version>3.3.0.2</cassandra.unit.version>
    </properties>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
            </snapshots>
        </repository>
    </repositories>

    <!-- Execute "mvn clean package -Pbuild-jar" to build a jar file out of
              this project! How to use the Flink Quickstart pom: a) Adding new dependencies:
              You can add dependencies to the list below. Please check if the maven-shade-plugin
              below is filtering out your dependency and remove the exclude from there.
              b) Build a jar for running on the cluster: There are two options for creating
              a jar from this project b.1) "mvn clean package" -> this will create a fat
              jar which contains all dependencies necessary for running the jar created
              by this pom in a cluster. The "maven-shade-plugin" excludes everything that
              is provided on a running Flink cluster. b.2) "mvn clean package -Pbuild-jar"
              -> This will also create a fat-jar, but with much nicer dependency exclusion
              handling. This approach is preferred and leads to much cleaner jar files. -->

    <dependencies>
        <!-- Apache Flink dependencies -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- This dependency is required to actually execute jobs. It is currently pulled in by
flink-streaming-java, but we explicitly depend on it to safeguard against future changes. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- explicitly add a standard loggin framework, as Flink does not have
                      a hard dependency on one specific framework by default -->

        <!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>com.carrotsearch</groupId>
            <artifactId>hppc</artifactId>
            <version>0.3.4</version>
        </dependency>
        <!-- TEST PURPOSE -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
        </dependency>
        <dependency>
            <groupId>org.cassandraunit</groupId>
            <artifactId>cassandra-unit-shaded</artifactId>
            <version>3.3.0.2</version>
        </dependency>

        <!-- DATASTAX -->

        <dependency>
            <groupId>com.datastax.cassandra</groupId>
            <artifactId>cassandra-driver-extras</artifactId>
            <version>${cassandra.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>io.netty</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.datastax.cassandra</groupId>
            <artifactId>cassandra-driver-mapping</artifactId>
            <version>${cassandra.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>io.netty</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

         <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch5_2.10</artifactId>
            <version>${flink.connector.elastic.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-cassandra_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>${jackson.version}</version>
        </dependency>
    </dependencies>

    <profiles>
        <profile>
            <!-- Profile for packaging correct JAR files -->
            <id>build-jar</id>

            <activation>
                <activeByDefault>false</activeByDefault>
            </activation>

            <dependencies>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-core</artifactId>
                    <version>${flink.version}</version>
                    <scope>provided</scope>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-java</artifactId>
                    <version>${flink.version}</version>
                    <scope>provided</scope>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-clients_${scala.binary.version}</artifactId>
                    <version>${flink.version}</version>
                    <scope>provided</scope>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                    <version>${flink.version}</version>
                    <scope>provided</scope>
                </dependency>
                <dependency>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                    <version>${log4j.version}</version>
                    <scope>provided</scope>
                </dependency>
                <!-- DTO -->
                <dependency>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-databind</artifactId>
                    <version>${jackson.version}</version>
                </dependency>
                <!-- APACHE CASSANDRA -->
                <dependency>
                    <groupId>com.datastax.cassandra</groupId>
                    <artifactId>cassandra-driver-extras</artifactId>
                    <version>${cassandra.version}</version>
                </dependency>
                <dependency>
                    <groupId>com.datastax.cassandra</groupId>
                    <artifactId>cassandra-driver-mapping</artifactId>
                    <version>${cassandra.version}</version>
                </dependency>

            </dependencies>

            <build>
                <plugins>
                    <!-- disable the exclusion rules -->
                    <plugin>
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-shade-plugin</artifactId>
                        <version>2.4.1</version>
                        <executions>
                            <execution>
                                <phase>package</phase>
                                <goals>
                                    <goal>shade</goal>
                                </goals>
                                <configuration>
                                    <artifactSet>
                                        <excludes combine.self="override"/>
                                    </artifactSet>
                                </configuration>
                            </execution>
                        </executions>
                    </plugin>
                </plugins>
            </build>
        </profile>
    </profiles>

    <build>
        <plugins>
            <!-- We use the maven-shade plugin to create a fat jar that contains all
          dependencies except flink and it's transitive dependencies. The resulting
          fat-jar can be executed on a cluster. Change the value of Program-Class if
          your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.0</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <!-- This list contains all dependencies of flink-dist Everything
                      else will be packaged into the fat-jar -->
                                    <exclude>org.apache.flink:flink-annotations</exclude>
                                    <exclude>org.apache.flink:flink-shaded-hadoop1_2.10</exclude>
                                    <exclude>org.apache.flink:flink-shaded-hadoop2</exclude>
                                    <exclude>org.apache.flink:flink-shaded-curator-recipes</exclude>
                                    <exclude>org.apache.flink:flink-core</exclude>
                                    <exclude>org.apache.flink:flink-java</exclude>
                                    <!--                                                                    <exclude>org.apache.flink:flink-scala_2.10</exclude> -->
                                    <exclude>org.apache.flink:flink-runtime_2.10</exclude>
                                    <exclude>org.apache.flink:flink-optimizer_2.10</exclude>
                                    <exclude>org.apache.flink:flink-clients_2.10</exclude>
                                    <exclude>org.apache.flink:flink-avro_2.10</exclude>
                                    <exclude>org.apache.flink:flink-examples-batch_2.10</exclude>
                                    <exclude>org.apache.flink:flink-examples-streaming_2.10</exclude>
                                    <exclude>org.apache.flink:flink-streaming-java_2.10</exclude>

                                    <!-- Also exclude very big transitive dependencies of Flink WARNING:
                      You have to remove these excludes if your code relies on other versions of
                      these dependencies. -->
                                    <exclude>org.scala-lang:scala-library</exclude>
                                    <exclude>org.scala-lang:scala-compiler</exclude>
                                    <exclude>org.scala-lang:scala-reflect</exclude>
                                    <exclude>com.typesafe.akka:akka-actor_*</exclude>
                                    <exclude>com.typesafe.akka:akka-remote_*</exclude>
                                    <exclude>io.netty:netty-all</exclude>
                                    <exclude>io.netty:netty</exclude>
                                    <exclude>commons-fileupload:commons-fileupload</exclude>
                                    <exclude>org.apache.avro:avro</exclude>
                                    <exclude>commons-collections:commons-collections</exclude>
                                    <exclude>org.codehaus.jackson:jackson-core-asl</exclude>
                                    <exclude>org.codehaus.jackson:jackson-mapper-asl</exclude>
                                    <exclude>com.thoughtworks.paranamer:paranamer</exclude>
                                    <exclude>org.xerial.snappy:snappy-java</exclude>
                                    <exclude>org.apache.commons:commons-compress</exclude>
                                    <exclude>org.tukaani:xz</exclude>
                                    <exclude>com.esotericsoftware.kryo:kryo</exclude>
                                    <exclude>com.esotericsoftware.minlog:minlog</exclude>
                                    <exclude>org.objenesis:objenesis</exclude>
                                    <exclude>com.twitter:chill_*</exclude>
                                    <exclude>com.twitter:chill-java</exclude>
                                    <exclude>commons-lang:commons-lang</exclude>
                                    <exclude>junit:junit</exclude>
                                    <exclude>org.apache.commons:commons-lang3</exclude>
                                    <exclude>log4j:log4j</exclude>
                                    <exclude>org.apache.commons:commons-math</exclude>
                                    <exclude>org.apache.sling:org.apache.sling.commons.json</exclude>
                                    <exclude>commons-logging:commons-logging</exclude>
                                    <exclude>commons-codec:commons-codec</exclude>
                                    <exclude>stax:stax-api</exclude>
                                    <exclude>com.typesafe:config</exclude>
                                    <exclude>org.uncommons.maths:uncommons-maths</exclude>
                                    <exclude>com.github.scopt:scopt_*</exclude>
                                    <exclude>commons-io:commons-io</exclude>
                                    <exclude>commons-cli:commons-cli</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>org.apache.flink:*</artifact>
                                    <excludes>
                                        <!-- exclude shaded google but include shaded curator -->
                                        <exclude>org/apache/flink/shaded/com/**</exclude>
                                        <exclude>web-docs/**</exclude>
                                    </excludes>
                                </filter>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder. Otherwise,
                      this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <!-- If you want to use ./bin/flink run <quickstart jar> uncomment
                  the following lines. This will add a Main-Class entry to the manifest file -->
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>it.almaviva.wtf.mms.integratemobilitystatusevent.PassingEventProcess
                                    </mainClass>
                                </transformer>
                            </transformers>
                            <createDependencyReducedPom>false</createDependencyReducedPom>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <compilerId>jdt</compilerId>
                </configuration>
                <dependencies>
                    <dependency>
                        <groupId>org.eclipse.tycho</groupId>
                        <artifactId>tycho-compiler-jdt</artifactId>
                        <version>0.21.0</version>
                    </dependency>
                </dependencies>
            </plugin>
        </plugins>

        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.7.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <compilerId>jdt</compilerId>
                    </configuration>
                    <dependencies>
                        <dependency>
                            <groupId>org.eclipse.tycho</groupId>
                            <artifactId>tycho-compiler-jdt</artifactId>
                            <version>0.21.0</version>
                        </dependency>
                    </dependencies>
                </plugin>

                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-assembly-plugin</artifactId>
                                        <versionRange>[2.4,)</versionRange>
                                        <goals>
                                            <goal>single</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-compiler-plugin</artifactId>
                                        <versionRange>[3.1,)</versionRange>
                                        <goals>
                                            <goal>testCompile</goal>
                                            <goal>compile</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>
</project>
kcugc4gi

kcugc4gi1#

您可以禁用netty的本机epoll传输,并通过添加jvm参数强制使用默认的基于nio的传输 -Dcom.datastax.driver.FORCE_NIO=true .
(https://docs.datastax.com/en/developer/java-driver-dse/1.4/faq/#what-is-netty-s-native-epoll-transport-and-how-do-i-enable-or-disable-it(启用或禁用)
在Flink你必须 env.java.opts 进入 conf/flink-conf.yaml 用那个论点。
(https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#common-(可选)

9rnv2umw

9rnv2umw2#

我在conf/flink.yaml中添加了这个参数,太棒了,它就像一个符咒!!!!我花了好几个小时和同事核对pom文件谢谢
裁判:https://docs.datastax.com/en/developer/java-driver-dse/1.4/faq/https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/config.html#运行时算法

相关问题