我使用的版本是合流Kafka5.1.14
我试着运行kafka连接器elasticsearch将数据从kafka发送到elasticsearch我在独立模式下测试了这个配置,一切正常。以下是elasticsearch接收器的独立配置
name=elasticsearch-sink-standalone
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics.regex=^[a-zA-Z0-9]((?!-raw$)[a-zA-Z0-9-_])+[a-zA-Z0-9]$
connection.url=http://elasticsearch:9200
type.name=_doc
key.ignore=true
schema.ignore=true
但当我使用相同的设置在分布式模式下创建接收器时,它会出错。这是创建接收器时请求的post主体
{
"name" : "connector-test",
"config" : {
"connector.class" : "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max" : "1",
"topics.regex" : "^[a-zA-Z0-9]((?!-raw$)[a-zA-Z0-9-_])+[a-zA-Z0-9]$",
"connection.url" : "http://elasitcsearch:9200",
"type.name" : "_doc",
"key.ignore" : "true",
"schema.ignore" : "true"
}
}
这就是我得到的错误
[2020-08-04 18:18:35,911] ERROR WorkerSinkTask{id=connector-test-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:512)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:492)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:344)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:512)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'status': was expecting ('true', 'false' or 'null')
at [Source: (byte[])"status-task-connector-test-0"; line: 1, column: 8]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'status': was expecting ('true', 'false' or 'null')
at [Source: (byte[])"status-task-connector-test-0"; line: 1, column: 8]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:703)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3532)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2627)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:832)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:729)
at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4043)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2572)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:342)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:512)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:512)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:492)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2020-08-04 18:18:35,912] ERROR WorkerSinkTask{id=connector-test-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
我可以将分布式模式与其他regex一起使用,因此我假设错误是由regex引起的。但让我困惑的是,独立模式可以与这个正则表达式一起工作。
独立模式和分布式模式都是用运行的(我不确定这个信息是否有用)
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
请告诉我,如果我需要提供更多的信息谢谢!
1条答案
按热度按时间s5a0g9ez1#
我不认为这里的问题是正则表达式。stacktrace清楚地显示,在从源系统(在本例中是kafka,因为连接器是接收器)轮询消息之后,正在尝试序列化消息,并且发生something-2-json错误。您需要了解导致转换错误的记录的格式。
regex用于知道连接器应该从哪些主题读取记录,这似乎与预期的一样有效。