flink streaming中的获取错误“java.lang.nosuchfielderror:lucene\u 4\u 10\u 4”

332nm8kg  于 2021-06-25  发布在  Flink
关注(0)|答案(0)|浏览(318)

我正在尝试使用代码为我的数据流添加elasticsearch作为接收器

val config = new HashMap[String, String]
config.put("bulk.flush.max.actions", "1")
config.put("cluster.name", "cluster")

val transports = new ArrayList[InetSocketAddress]
transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))

// testing simple setup
val input:DataStream[String] = timedStream.map( _.language.toString)
input.print()

input.addSink(new ElasticsearchSink(config, transports, new ElasticsearchSinkFunction[String] {
  def createIndexRequest(element: String): IndexRequest = {
    val json = new java.util.HashMap[String, AnyRef]
    // Map stream fields to JSON properties, format:
    // json.put("json-property-name", streamField)
    json.put("data", element)
    Requests.indexRequest.index("test").`type`("test").source(json)
  }

  override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
    indexer.add(createIndexRequest(element))
  }
}))

我的pom文件依赖关系是

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-core</artifactId>
        <version>1.3.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.10</artifactId>
        <version>1.3.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.3.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.10</artifactId>
        <version>1.3.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.10</artifactId>
        <version>1.3.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-twitter_2.10</artifactId>
        <version>1.3.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch2_2.10</artifactId>
        <version>1.3.2</version>
    </dependency>

</dependencies>

另外,我的系统中有elasticsearch 2.4.6版,在执行程序的过程中,我遇到以下错误:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: java.lang.NoSuchFieldError: LUCENE_4_10_4
at org.elasticsearch.Version.<clinit>(Version.java:230)
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:129)
at org.apache.flink.streaming.connectors.elasticsearch2.Elasticsearch2ApiCallBridge.createClient(Elasticsearch2ApiCallBridge.java:65)
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:272)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)

问题出在哪里,我尝试过在pom中包含elasticsearch 2.4.6,但没有成功。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题