SpringBoot Camel(3.14.0)和ElasticSearch集成

laik7k3q  于 2022-11-07  发布在  Apache
关注(0)|答案(1)|浏览(130)

我正在尝试将Sping Boot (2.6.3)与Camel(3.14.0)集成起来,以便向ElasticSearch发送一个简单的查询。
该项目位于https://github.com/saavedrah/springboot-camel-elastic
我有一个计时器,它每60秒发送一次ElasticSearch查询。但是,生成了以下异常:

2022-01-26 11:08:48.036  WARN 23668 --- [1 - timer://foo] o.a.camel.component.timer.TimerConsumer  : Error processing exchange. Exchange[2578AAA06ED30CB-0000000000000000]. Caused by: [org.apache.camel.CamelExecutionException - Exception occurred during execution on the exchange: Exchange[]]

org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[]
    at org.apache.camel.CamelExecutionException.wrapCamelExecutionException(CamelExecutionException.java:45) ~[camel-api-3.14.0.jar:3.14.0]
    at org.apache.camel.support.AbstractExchange.setException(AbstractExchange.java:589) ~[camel-support-3.14.0.jar:3.14.0]
    at org.apache.camel.support.DefaultExchange.setException(DefaultExchange.java:27) ~[camel-support-3.14.0.jar:3.14.0]
    at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:69) ~[camel-support-3.14.0.jar:3.14.0]
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172) ~[camel-core-processor-3.14.0.jar:3.14.0]
    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:469) ~[camel-core-processor-3.14.0.jar:3.14.0]
    at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:187) ~[camel-base-engine-3.14.0.jar:3.14.0]
    at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64) ~[camel-base-engine-3.14.0.jar:3.14.0]
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:184) ~[camel-core-processor-3.14.0.jar:3.14.0]
    at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:398) ~[camel-base-engine-3.14.0.jar:3.14.0]
    at org.apache.camel.component.timer.TimerConsumer.sendTimerExchange(TimerConsumer.java:210) ~[camel-timer-3.14.0.jar:3.14.0]
    at org.apache.camel.component.timer.TimerConsumer$1.run(TimerConsumer.java:76) ~[camel-timer-3.14.0.jar:3.14.0]
    at java.base/java.util.TimerThread.mainLoop(Timer.java:556) ~[na:na]
    at java.base/java.util.TimerThread.run(Timer.java:506) ~[na:na]
Caused by: java.lang.NoClassDefFoundError: org/elasticsearch/common/CheckedConsumer
    at org.apache.camel.component.elasticsearch.ElasticsearchProducer$HighLevelClient.<init>(ElasticsearchProducer.java:347) ~[camel-elasticsearch-rest-3.14.0.jar:3.14.0]
    at org.apache.camel.component.elasticsearch.ElasticsearchProducer$HighLevelClient.<init>(ElasticsearchProducer.java:345) ~[camel-elasticsearch-rest-3.14.0.jar:3.14.0]
    at org.apache.camel.component.elasticsearch.ElasticsearchProducer.process(ElasticsearchProducer.java:124) ~[camel-elasticsearch-rest-3.14.0.jar:3.14.0]
    at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66) ~[camel-support-3.14.0.jar:3.14.0]
    ... 10 common frames omitted
Caused by: java.lang.ClassNotFoundException: org.elasticsearch.common.CheckedConsumer
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[na:na]
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[na:na]
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521) ~[na:na]
    ... 14 common frames omitted

更新1

相应地,对于这个站点CAMEL-17183,下载的ElasticSearch(7.15.2)版本没有[org.elasticsearch.common.CheckedConsumer]类。
那么,我应该如何更新gradle.properties以获得支持的ElasticSearch版本(7.8.0)呢?

更新2

Gradle属性已更新,以强制使用ElasticSearch的版本。

configurations.all {
    resolutionStrategy {
        dependencySubstitution {
            substitute module('org.elasticsearch.client:elasticsearch-rest-high-level-client') with module('org.elasticsearch.client:elasticsearch-rest-high-level-client:7.8.0')
            substitute module('org.elasticsearch.client:elasticsearch-rest-client') with module('org.elasticsearch.client:elasticsearch-rest-client:7.8.0')
            substitute module('org.elasticsearch:elasticsearch') with module('org.elasticsearch:elasticsearch:7.8.0')
        }
    }
}

现在我得到了下面的异常,我猜它与传递给查询的参数不足有关...

java.lang.IllegalArgumentException: Wrong body type. Only Map, String or SearchRequest is allowed as a type
    at org.apache.camel.component.elasticsearch.ElasticsearchProducer.process(ElasticsearchProducer.java:232)
    at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66)
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172)
    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:469)
    at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:187)
    at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:184)
    at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:398)
    at org.apache.camel.component.timer.TimerConsumer.sendTimerExchange(TimerConsumer.java:210)
    at org.apache.camel.component.timer.TimerConsumer$1.run(TimerConsumer.java:76)
    at java.base/java.util.TimerThread.mainLoop(Timer.java:556)
    at java.base/java.util.TimerThread.run(Timer.java:506)
13:13:25.219 [Camel (camel-1) thread #1 - timer://foo] WARN  o.a.c.component.timer.TimerConsumer - Error processing exchange. Exchange[DB28E010007B18C-0000000000000006]. Caused by: [java.lang.IllegalArgumentException - Wrong body type. Only Map, String or SearchRequest is allowed as a type]
java.lang.IllegalArgumentException: Wrong body type. Only Map, String or SearchRequest is allowed as a type
    at org.apache.camel.component.elasticsearch.ElasticsearchProducer.process(ElasticsearchProducer.java:232)
    at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66)
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172)
    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:469)
    at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:187)
    at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:184)
    at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:398)
    at org.apache.camel.component.timer.TimerConsumer.sendTimerExchange(TimerConsumer.java:210)
    at org.apache.camel.component.timer.TimerConsumer$1.run(TimerConsumer.java:76)
    at java.base/java.util.TimerThread.mainLoop(Timer.java:556)
    at java.base/java.util.TimerThread.run(Timer.java:506)

任何帮助都是感激不尽的。

cigdeys3

cigdeys31#

Huang告诉我的是,我应该在计时器组件主体中放置一些查询,以便将其传递给Search操作,类似于:

from("timer://foo?fixedRate=true&period=60000")
  .process(new Processor() {
    @Override
    public void process(Exchange exchange) throws Exception {
      exchange.getIn().setBody("{\"query\":{\"match_all\":{}}}");
    }
  })
  .to("direct:search");

from("direct:search")
  .to("elasticsearch-rest://elasticsearch?operation=Search&indexName=twitter");
  • 谢谢-谢谢

相关问题