我有一个flink jar,它在elastic和cassandra上接收可序列化数据类型的数据流,其beahaviour与独立上下文不同。我读过关于netty与flink进程的冲突,我从pom文件中排除了它,但它随后会包含在包中
有什么建议吗?
例外情况如下:
java.lang.ClassCastException: io.netty.channel.epoll.EpollEventLoopGroup cannot be cast to io.netty.channel.EventLoopGroup
at com.datastax.driver.core.NettyUtil.newEventLoopGroupInstance(NettyUtil.java:134)
at com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:99)
at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:774)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1446)
at com.datastax.driver.core.Cluster.init(Cluster.java:159)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:330)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:305)
at com.datastax.driver.core.Cluster.connect(Cluster.java:247)
at it.almaviva.wtf.mms.integratemobilitystatusevent.repository.cassandra.ScheduledEventRepositoryImpl.findProgrammed(ScheduledEventRepositoryImpl.java:51)
at it.almaviva.wtf.mms.integratemobilitystatusevent.transformation.ObservedEventDelayProcessFunction.loadScheduledEvents(ObservedEventDelayProcessFunction.java:368)
at it.almaviva.wtf.mms.integratemobilitystatusevent.transformation.ObservedEventDelayProcessFunction.processElement(ObservedEventDelayProcessFunction.java:64)
at it.almaviva.wtf.mms.integratemobilitystatusevent.transformation.ObservedEventDelayProcessFunction.processElement(ObservedEventDelayProcessFunction.java:1)
at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:94)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Suppressed: java.lang.NullPointerException
at com.datastax.driver.core.Cluster$Manager.close(Cluster.java:1676)
at com.datastax.driver.core.Cluster$Manager.access$200(Cluster.java:1354)
at com.datastax.driver.core.Cluster.closeAsync(Cluster.java:566)
at com.datastax.driver.core.Cluster.close(Cluster.java:578)
at it.almaviva.wtf.mms.integratemobilitystatusevent.repository.cassandra.ScheduledEventRepositoryImpl.findProgrammed(ScheduledEventRepositoryImpl.java:68)
这是pom文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>it.almaviva.wtf.mms</groupId>
<artifactId>mms-integretedmobilitystatusevent-flink-process</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>Passing Event Process</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.4.0</flink.version>
<slf4j.version>1.7.7</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<scala.binary.version>2.11</scala.binary.version>
<junit.version>4.12</junit.version>
<cassandra.version>3.2.0</cassandra.version>
<flink.connector.elastic.version>1.3.1</flink.connector.elastic.version>
<thyco.compiler.version>0.21.0</thyco.compiler.version>
<jackson.version>2.9.4</jackson.version>
<cassandra.unit.version>3.3.0.2</cassandra.unit.version>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
</snapshots>
</repository>
</repositories>
<!-- Execute "mvn clean package -Pbuild-jar" to build a jar file out of
this project! How to use the Flink Quickstart pom: a) Adding new dependencies:
You can add dependencies to the list below. Please check if the maven-shade-plugin
below is filtering out your dependency and remove the exclude from there.
b) Build a jar for running on the cluster: There are two options for creating
a jar from this project b.1) "mvn clean package" -> this will create a fat
jar which contains all dependencies necessary for running the jar created
by this pom in a cluster. The "maven-shade-plugin" excludes everything that
is provided on a running Flink cluster. b.2) "mvn clean package -Pbuild-jar"
-> This will also create a fat-jar, but with much nicer dependency exclusion
handling. This approach is preferred and leads to much cleaner jar files. -->
<dependencies>
<!-- Apache Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- This dependency is required to actually execute jobs. It is currently pulled in by
flink-streaming-java, but we explicitly depend on it to safeguard against future changes. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- explicitly add a standard loggin framework, as Flink does not have
a hard dependency on one specific framework by default -->
<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>com.carrotsearch</groupId>
<artifactId>hppc</artifactId>
<version>0.3.4</version>
</dependency>
<!-- TEST PURPOSE -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
<dependency>
<groupId>org.cassandraunit</groupId>
<artifactId>cassandra-unit-shaded</artifactId>
<version>3.3.0.2</version>
</dependency>
<!-- DATASTAX -->
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-extras</artifactId>
<version>${cassandra.version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-mapping</artifactId>
<version>${cassandra.version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch5_2.10</artifactId>
<version>${flink.connector.elastic.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-cassandra_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
</dependencies>
<profiles>
<profile>
<!-- Profile for packaging correct JAR files -->
<id>build-jar</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
<scope>provided</scope>
</dependency>
<!-- DTO -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- APACHE CASSANDRA -->
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-extras</artifactId>
<version>${cassandra.version}</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-mapping</artifactId>
<version>${cassandra.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- disable the exclusion rules -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes combine.self="override"/>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<build>
<plugins>
<!-- We use the maven-shade plugin to create a fat jar that contains all
dependencies except flink and it's transitive dependencies. The resulting
fat-jar can be executed on a cluster. Change the value of Program-Class if
your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<!-- This list contains all dependencies of flink-dist Everything
else will be packaged into the fat-jar -->
<exclude>org.apache.flink:flink-annotations</exclude>
<exclude>org.apache.flink:flink-shaded-hadoop1_2.10</exclude>
<exclude>org.apache.flink:flink-shaded-hadoop2</exclude>
<exclude>org.apache.flink:flink-shaded-curator-recipes</exclude>
<exclude>org.apache.flink:flink-core</exclude>
<exclude>org.apache.flink:flink-java</exclude>
<!-- <exclude>org.apache.flink:flink-scala_2.10</exclude> -->
<exclude>org.apache.flink:flink-runtime_2.10</exclude>
<exclude>org.apache.flink:flink-optimizer_2.10</exclude>
<exclude>org.apache.flink:flink-clients_2.10</exclude>
<exclude>org.apache.flink:flink-avro_2.10</exclude>
<exclude>org.apache.flink:flink-examples-batch_2.10</exclude>
<exclude>org.apache.flink:flink-examples-streaming_2.10</exclude>
<exclude>org.apache.flink:flink-streaming-java_2.10</exclude>
<!-- Also exclude very big transitive dependencies of Flink WARNING:
You have to remove these excludes if your code relies on other versions of
these dependencies. -->
<exclude>org.scala-lang:scala-library</exclude>
<exclude>org.scala-lang:scala-compiler</exclude>
<exclude>org.scala-lang:scala-reflect</exclude>
<exclude>com.typesafe.akka:akka-actor_*</exclude>
<exclude>com.typesafe.akka:akka-remote_*</exclude>
<exclude>io.netty:netty-all</exclude>
<exclude>io.netty:netty</exclude>
<exclude>commons-fileupload:commons-fileupload</exclude>
<exclude>org.apache.avro:avro</exclude>
<exclude>commons-collections:commons-collections</exclude>
<exclude>org.codehaus.jackson:jackson-core-asl</exclude>
<exclude>org.codehaus.jackson:jackson-mapper-asl</exclude>
<exclude>com.thoughtworks.paranamer:paranamer</exclude>
<exclude>org.xerial.snappy:snappy-java</exclude>
<exclude>org.apache.commons:commons-compress</exclude>
<exclude>org.tukaani:xz</exclude>
<exclude>com.esotericsoftware.kryo:kryo</exclude>
<exclude>com.esotericsoftware.minlog:minlog</exclude>
<exclude>org.objenesis:objenesis</exclude>
<exclude>com.twitter:chill_*</exclude>
<exclude>com.twitter:chill-java</exclude>
<exclude>commons-lang:commons-lang</exclude>
<exclude>junit:junit</exclude>
<exclude>org.apache.commons:commons-lang3</exclude>
<exclude>log4j:log4j</exclude>
<exclude>org.apache.commons:commons-math</exclude>
<exclude>org.apache.sling:org.apache.sling.commons.json</exclude>
<exclude>commons-logging:commons-logging</exclude>
<exclude>commons-codec:commons-codec</exclude>
<exclude>stax:stax-api</exclude>
<exclude>com.typesafe:config</exclude>
<exclude>org.uncommons.maths:uncommons-maths</exclude>
<exclude>com.github.scopt:scopt_*</exclude>
<exclude>commons-io:commons-io</exclude>
<exclude>commons-cli:commons-cli</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>org.apache.flink:*</artifact>
<excludes>
<!-- exclude shaded google but include shaded curator -->
<exclude>org/apache/flink/shaded/com/**</exclude>
<exclude>web-docs/**</exclude>
</excludes>
</filter>
<filter>
<!-- Do not copy the signatures in the META-INF folder. Otherwise,
this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<!-- If you want to use ./bin/flink run <quickstart jar> uncomment
the following lines. This will add a Main-Class entry to the manifest file -->
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>it.almaviva.wtf.mms.integratemobilitystatusevent.PassingEventProcess
</mainClass>
</transformer>
</transformers>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<compilerId>jdt</compilerId>
</configuration>
<dependencies>
<dependency>
<groupId>org.eclipse.tycho</groupId>
<artifactId>tycho-compiler-jdt</artifactId>
<version>0.21.0</version>
</dependency>
</dependencies>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<compilerId>jdt</compilerId>
</configuration>
<dependencies>
<dependency>
<groupId>org.eclipse.tycho</groupId>
<artifactId>tycho-compiler-jdt</artifactId>
<version>0.21.0</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<versionRange>[2.4,)</versionRange>
<goals>
<goal>single</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
2条答案
按热度按时间kcugc4gi1#
您可以禁用netty的本机epoll传输,并通过添加jvm参数强制使用默认的基于nio的传输
-Dcom.datastax.driver.FORCE_NIO=true
.(https://docs.datastax.com/en/developer/java-driver-dse/1.4/faq/#what-is-netty-s-native-epoll-transport-and-how-do-i-enable-or-disable-it(启用或禁用)
在Flink你必须
env.java.opts
进入conf/flink-conf.yaml
用那个论点。(https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#common-(可选)
9rnv2umw2#
我在conf/flink.yaml中添加了这个参数,太棒了,它就像一个符咒!!!!我花了好几个小时和同事核对pom文件谢谢
裁判:https://docs.datastax.com/en/developer/java-driver-dse/1.4/faq/https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/config.html#运行时算法