Kafka连接器elasticsearch topics.regex

lo8azlld  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(395)

我使用的版本是合流Kafka5.1.14
我试着运行kafka连接器elasticsearch将数据从kafka发送到elasticsearch我在独立模式下测试了这个配置,一切正常。以下是elasticsearch接收器的独立配置

name=elasticsearch-sink-standalone
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics.regex=^[a-zA-Z0-9]((?!-raw$)[a-zA-Z0-9-_])+[a-zA-Z0-9]$
connection.url=http://elasticsearch:9200
type.name=_doc
key.ignore=true
schema.ignore=true

但当我使用相同的设置在分布式模式下创建接收器时,它会出错。这是创建接收器时请求的post主体

{
 "name" : "connector-test",
 "config" : {
  "connector.class" : "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "tasks.max" : "1",
  "topics.regex" : "^[a-zA-Z0-9]((?!-raw$)[a-zA-Z0-9-_])+[a-zA-Z0-9]$", 
  "connection.url" : "http://elasitcsearch:9200",
  "type.name" : "_doc",
  "key.ignore" : "true",
  "schema.ignore" : "true"
 }
}

这就是我得到的错误

[2020-08-04 18:18:35,911] ERROR WorkerSinkTask{id=connector-test-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:512)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:492)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:344)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:512)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'status': was expecting ('true', 'false' or 'null')
 at [Source: (byte[])"status-task-connector-test-0"; line: 1, column: 8]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'status': was expecting ('true', 'false' or 'null')
 at [Source: (byte[])"status-task-connector-test-0"; line: 1, column: 8]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:703)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3532)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2627)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:832)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:729)
    at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4043)
    at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2572)
    at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:342)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:512)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:512)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:492)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
[2020-08-04 18:18:35,912] ERROR WorkerSinkTask{id=connector-test-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)

我可以将分布式模式与其他regex一起使用,因此我假设错误是由regex引起的。但让我困惑的是,独立模式可以与这个正则表达式一起工作。
独立模式和分布式模式都是用运行的(我不确定这个信息是否有用)

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false

请告诉我,如果我需要提供更多的信息谢谢!

s5a0g9ez

s5a0g9ez1#

我不认为这里的问题是正则表达式。stacktrace清楚地显示,在从源系统(在本例中是kafka,因为连接器是接收器)轮询消息之后,正在尝试序列化消息,并且发生something-2-json错误。您需要了解导致转换错误的记录的格式。
regex用于知道连接器应该从哪些主题读取记录,这似乎与预期的一样有效。

相关问题