如何将elasticsearch与Kafka联系起来?

zxlwwiss  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(301)

我是Kafka和elasticsearch的新手,
我想用连接器把elasticsearch的数据推到kafka主题(图片来源)
我想让Kafka去elasticsearch,但我想让elasticsearch去Kafka
我用logstash做了这个连接器,但这并不是我真正想要的,我想让Kafka听一下elasticsearch et拉入主题的所有变化。
我找到了一个用于confluent连接器的插件jar
[这里][1]
但我有一个错误是不工作

{"error":{"root_cause":[{"type":"query_shard_exception","reason":"No mapping found for [@timestamp] in order to sort on","index_uuid":"bkQiBFqdQL-hCbWApo6bBQ","index":"metric"}],"type":"search_phase_execution_exception","reason":"all shards failed","phase":"query","grouped":true,"failed_shards":[{"shard":0,"index":"metric","node":"wMGlz0c9Q5yViW0bsKhKOA","reason":{"type":"query_shard_exception","reason":"No mapping found for [@timestamp] in order to sort on","index_uuid":"bkQiBFqdQL-hCbWApo6bBQ","index":"metric"}}]},"status":400}
            at org.elasticsearch.client.RestClient$1.completed(RestClient.java:357)
            at org.elasticsearch.client.RestClient$1.completed(RestClient.java:346)
            at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
            at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
            at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:432)
            at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:325)
            at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:267)
            at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
            at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
            at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:116)
            at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:164)
            at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:339)
            at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:317)
            at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:278)
            at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:106)
            at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:590)
            ... 1 more

连接器使用此配置

{       "name": "elastic-source",
"config": {"connector.class":"com.github.dariobalinzo.ElasticSourceConnector",
                            "tasks.max": "1",
                            "es.host" : "localhost",
                            "es.port" : "9200",
                            "index.prefix" : "metric",
                            "topic.prefix" : "es_",
                            "incrementing.field.name" : "@timestamp"
    }

}
这是我的索引中数据导入的示例

{"index":{"_index": "movies","_type":"movie","_id":1}}
{"fields" : {"directors" : ["Joseph Gordon-Levitt"],"release_date" : "2013-01-18T00:00:00Z","rating" : 7.4,"genres" : ["Comedy","Drama"],"image_url" : "http://ia.media-imdb.com/images/M/MV5BMTQxNTc3NDM2MF5BMl5BanBnXkFtZTcwNzQ5NTQ3OQ@@._V1_SX400_.jpg","plot" : "A New Jersey guy dedicated to his family, friends, and church, develops unrealistic expectations from watching porn and works to find happiness and intimacy with his potential true love.","title" : "Don Jon","rank" : 1,"running_time_secs" : 5400,"actors" : ["Joseph Gordon-Levitt","Scarlett Johansson","Julianne Moore"],"year" : 2013},"id" : "tt2229499","type" : "add"}

谢谢你的帮助!:)
编辑
我用这些数据更改索引

{"index":{"_index": "test","_type":"test","_id":1}}
{"fields" : {"directors" : "Joseph Gordon-Levitt","release_date" : "2013-01-18T00:00:00Z"}}

这个配置

{       "name": "elastic-source",
    "config": {"connector.class":"com.github.dariobalinzo.ElasticSourceConnector",
                                "tasks.max": "1",
                                "es.host" : "localhost",
                                "es.port" : "9200",
                                "index.prefix" : "test",
                                "topic.prefix" : "es_",
                                "incrementing.field.name" : "_score"
        }
}

现在我犯了这个错误

[2018-09-28 11:00:06,367] INFO index test-elasticsearch-sink total messages: null  (com.github.dariobalinzo.task.ElasticSourceTask:149)
[2018-09-28 11:00:06,367] INFO fetching from test (com.github.dariobalinzo.task.ElasticSourceTask:143)
[2018-09-28 11:00:06,373] INFO total shard 5, successuful: 5 (com.github.dariobalinzo.task.ElasticSourceTask:200)
[2018-09-28 11:00:06,373] ERROR error fetching min value (com.github.dariobalinzo.task.ElasticSourceTask:220)
java.lang.NullPointerException
        at com.github.dariobalinzo.task.ElasticSourceTask.fetchLastOffset(ElasticSourceTask.java:215)
        at com.github.dariobalinzo.task.ElasticSourceTask.lambda$poll$0(ElasticSourceTask.java:144)
        at java.util.Arrays$ArrayList.forEach(Arrays.java:3880)
        at com.github.dariobalinzo.task.ElasticSourceTask.poll(ElasticSourceTask.java:140)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

你有什么主意吗?谢谢你!!

[1]: https://medium.com/@dariobalinzo/kafka-connect-elasticsearch-source-connector-1a8c16a0e8eb

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题