“索引\u已存在\u异常”-kafka到elasticsearch(ssl)

sxpgvts3  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(382)

我们已经将kafka elastic search接收器连接器传输数据到elastic search(v5.6.3)。
我正在使用ConfluentV5.0.0,没有看到任何其他错误。我已经删除了索引并重新启动了ElasticSearch连接器。但还是有同样的错误
连接器配置如下:

{
"name":"elasticsearch_topic",
"config":
{       
    "connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max":"3",
    "topics":"sample_topic",
    "connection.url":"https://127.0.0.1:9200,https://127.0.0.2:9200",               
    "connection.username":"elsatic_user",
    "connection.password":"elastic_user",
    "type.name":"log",
    "flush.timeout.ms":"60000",
    "connection.timeout.ms":"60000",
    "read.timeout.ms":"60000",
    "batch.size":"20",
    "topic.index.map":"sample_topic:elastic_search_index_test",
    "transforms":"extract,insertenv,inserttimestamp,convert_current_ts,routeTS",
    "schema.ignore": "true",
    "transforms.extract.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extract.field":"RE_NUM",
    "transforms.insertenv.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.insertenv.static.field": "_env",
        "transforms.insertenv.static.value": "dev",
        "transforms.inserttimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.inserttimestamp.timestamp.field": "date_time",      
            "transforms.convert_current_ts.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.convert_current_ts.target.type": "Timestamp",
        "transforms.convert_current_ts.field": "date_time",
        "transforms.convert_current_ts.format": "yyyy-MM-dd HH:mm:ss.SSSSSS",
        "transforms.routeTS.type":"org.apache.kafka.connect.transforms.TimestampRouter",  
    "transforms.routeTS.topic.format":"elastic_search_index_test-${timestamp}",  
    "transforms.routeTS.timestamp.format":"yyyyMMdd"
 }
}

到现在为止,一直都还不错。没有问题。
最近,我们在ElasticSearch上启用了ssl,为此,我在上述配置中添加了“用户名”、“密码”和“https”,然后重新启动了连接器和工作进程。从那时起,我可以看到“索引\u已存在\u异常”,错误如下:

[2018-12-06 03:36:21,487] ERROR WorkerSinkTask{id=elasticsearch_topic-1} 
 Task threw an uncaught and unrecoverable exception 
 (org.apache.kafka.connect.runtime.WorkerTask:177)
 org.apache.kafka.connect.errors.ConnectException: Could not create index 
 'elastic_search_index_test': {"root_cause": 
 [{"type":"index_already_exists_exception","reason":"index 
[elastic_search_index_test/QVgWV8E7RmuSArtIJt3m3g] already exists","index_uuid":"QVgWV8E7RmuSArtIJt3m3g","index":"elastic_search_index_test"}],"type":"index_already_exists_exception","reason":"index [elastic_search_index_test/QVgWV8E7RmuSArtIJt3m3g] already exists","index_uuid":"QVgWV8E7RmuSArtIJt3m3g","index":"elastic_search_index_test"}
    at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createIndices(JestElasticsearchClient.java:238)
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.createIndicesForTopics(ElasticsearchWriter.java:330)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.open(ElasticsearchSinkTask.java:157)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:612)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:69)
    at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:672)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:283)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:343)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
 [2018-12-06 03:36:21,487] ERROR WorkerSinkTask{id=elasticsearch_topic-1} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)

到目前为止我尝试过的步骤:
停止ElasticSearch接收器连接器和工作线程
从elastic search(通过kibana)中删除了索引“elastic\u search\u index\u test”
重新启动worker和ElasticSearch连接器
但仍然得到相同的错误(如上所述)
有人能告诉我出了什么事吗?
提前谢谢!!

6tr1vspr

6tr1vspr1#

使用多个任务启动连接器时,这是一个非常常见的错误(在当前情况下为“tasks.max”:“3”)。
内部步骤Kafka连接ElasticSearch
如果indexe不存在,将检查kafka connect elasticsearch
如果es中缺少索引,它将创建索引
问题:-
此conector与3个任务一起运行(意味着3个线程执行相同的代码),多个任务发现索引不存在,并继续创建索引。第一个任务成功,第二个任务将引发索引未找到异常,因为它已由第一个任务创建。
解决方案:-
用一个任务“tasks.max”启动连接器:“1”(如果我们有大量数据,这是一个错误的选择)
在运行连接器之前在es中创建索引
使用分布式锁(如zookeeper)

相关问题