flink kafka弹力纱

xuo3flqw  于 2021-06-26  发布在  Flink
关注(0)|答案(1)|浏览(322)

独立模式没有问题。但是Yarn模式,它不能工作。我在flink类路径(flink/lib)中添加了hadoopjar
https://issues.apache.org/jira/browse/flink-3373
https://issues.apache.org/jira/browse/flink-6126
https://issues.apache.org/jira/browse/flink-4587
是在flink 1.2里解决的?
Flinkversion:1.9.1
hadoop/Yarnversion:2.6.0
ElasticSearchversion:6.8.1
maven依赖项

<properties>
        <compiler.version>1.8</compiler.version>
        <flink.version>1.9.0</flink.version>
        <java.version>1.8</java.version>
        <log4j.version>2.6.2</log4j.version>

        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

    </properties>

<dependencies>
        <!-- kafka -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- flink java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- flink steam api -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>6.8.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>6.8.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.elasticsearch</groupId>
                    <artifactId>elasticsearch</artifactId>
                </exclusion>

            </exclusions>
        </dependency>

        <!-- log -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.4</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.60</version>
        </dependency>

    </dependencies>

堆栈跟踪:

2020-05-21 16:39:01,918 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The program caused an error: 
 at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)
 at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
 at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
 at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
 at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchFieldError: INSTANCE
 at org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.<init>(DefaultHttpRequestWriterFactory.java:53)
 at org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.<init>(DefaultHttpRequestWriterFactory.java:57)
 at org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.<clinit>(DefaultHttpRequestWriterFactory.java:47)
 at org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory.<init>(ManagedNHttpClientConnectionFactory.java:75)
 at org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory.<init>(ManagedNHttpClientConnectionFactory.java:83)
 at org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory.<clinit>(ManagedNHttpClientConnectionFactory.java:64)
 at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager$InternalConnectionFactory.<init>(PoolingNHttpClientConnectionManager.java:553)
 at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.<init>(PoolingNHttpClientConnectionManager.java:163)
 at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.<init>(PoolingNHttpClientConnectionManager.java:147)
 at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.<init>(PoolingNHttpClientConnectionManager.java:119)
 at org.apache.http.impl.nio.client.HttpAsyncClientBuilder.build(HttpAsyncClientBuilder.java:668)
 at org.elasticsearch.client.RestClientBuilder$2.run(RestClientBuilder.java:241)
 at org.elasticsearch.client.RestClientBuilder$2.run(RestClientBuilder.java:238)
 at java.security.AccessController.doPrivileged(Native Method)
 at org.elasticsearch.client.RestClientBuilder.createHttpClient(RestClientBuilder.java:238)
 at org.elasticsearch.client.RestClientBuilder.access$000(RestClientBuilder.java:42)
 at org.elasticsearch.client.RestClientBuilder$1.run(RestClientBuilder.java:209)
 at org.elasticsearch.client.RestClientBuilder$1.run(RestClientBuilder.java:206)
 at java.security.AccessController.doPrivileged(Native Method)
 at org.elasticsearch.client.RestClientBuilder.build(RestClientBuilder.java:206)
 at org.elasticsearch.client.RestHighLevelClient.<init>(RestHighLevelClient.java:269)
 at org.elasticsearch.client.RestHighLevelClient.<init>(RestHighLevelClient.java:261)
 at cn.ipaynow.flink.SinkToElasticsearch.<clinit>(SinkToElasticsearch.java:44)
 at cn.ipaynow.Main.main(Main.java:69)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
 at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
 at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
 ... 7 more
ibrsph3r

ibrsph3r1#

我用 Flink Elasticsearch connector 解决问题。我昨天用函数做的。

<dependencies>
        <!-- kafka -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- flink java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- flink steam api -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>

            <exclusions>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-to-slf4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-core</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--        <dependency>-->
        <!--            <groupId>org.elasticsearch</groupId>-->
        <!--            <artifactId>elasticsearch</artifactId>-->
        <!--            <version>6.8.1</version>-->
        <!--            <exclusions>-->
        <!--                <exclusion>-->
        <!--                    <groupId>org.apache.logging.log4j</groupId>-->
        <!--                    <artifactId>log4j-api</artifactId>-->
        <!--                </exclusion>-->
        <!--            </exclusions>-->
        <!--        </dependency>-->
        <!--        <dependency>-->
        <!--            <groupId>org.elasticsearch.client</groupId>-->
        <!--            <artifactId>elasticsearch-rest-high-level-client</artifactId>-->
        <!--            <version>6.8.1</version>-->
        <!--            <exclusions>-->
        <!--                <exclusion>-->
        <!--                    <groupId>org.elasticsearch</groupId>-->
        <!--                    <artifactId>elasticsearch</artifactId>-->
        <!--                </exclusion>-->

        <!--            </exclusions>-->
        <!--        </dependency>-->

        <!-- log -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.4</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.60</version>
        </dependency>

    </dependencies>

相关问题