kafka连接接收器任务忽略容差限制

jxct1oxe  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(269)

我尝试忽略接收器连接器中的错误消息 errors.tolerance: all 选项。全连接器配置:

{
    "name": "crm_data-sink_pandora",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": 6,
        "topics": "crm_account_detail,crm_account_on_competitors,crm_event,crm_event_participation",
        "connection.url": "jdbc:postgresql://dburl/service?prepareThreshold=0",
        "connection.user": "pandora.app",
        "connection.password": "*******",
        "dialect.name": "PostgreSqlDatabaseDialect",
        "insert.mode": "upsert",
        "pk.mode": "record_value",
        "pk.fields": "guid",
        "table.name.format": "pandora.${topic}",
        "errors.tolerance": "all",
        "errors.log.enable":true,
        "errors.log.include.messages":true,
     "errors.deadletterqueue.topic.name":"crm_data_deadletterqueue",
        "errors.deadletterqueue.context.headers.enable":true
    }
}

目标表ddl:

create table crm_event_participation
(
  guid              char(36) not null
    constraint crm_event_participation_pkey
      primary key,
  created_on        timestamp,
  created_by_guid   char(36),
  modified_on       timestamp,
  modified_by_guid  char(36),
  process_listeners integer,
  event_guid        char(36),
  event_response    varchar(250),
  note              varchar(500),
  is_from_group     boolean,
  contact_guid      char(36),
  target_item       integer,
  account_guid      char(36),
  employer_id       integer
);

连接器成功启动,但如果出现错误(例如缺少字段),它将失败。 curl -X GET http://kafka-connect:9092/connectors/crm_data-sink_pandora/status :

{
    "name": "crm_data-sink_pandora",
    "connector": {
        "state": "RUNNING",
        "worker_id": "192.168.2.254:10900"
    },
    "tasks": [
        {
            "state": "FAILED",
            "trace": 
              "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
                 at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
                 at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
                 at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
                 at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
                 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
                 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
                 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
                 at java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                 at java.lang.Thread.run(Thread.java:748)
              Caused by: org.apache.kafka.connect.errors.ConnectException: Table \"pandora\".\"crm_event_participation\" is missing fields ([SinkRecordField{schema=Schema{STRING}, name='event_id', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRING}, name='event_response_guid', isPrimaryKey=false}]) and auto-evolution is disabled
                 at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:140)
                 at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:73)
                 at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:84)
                 at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:65)
                 at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:73)
                 at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
                 ... 10 more",
            "id": 0,
            "worker_id": "192.168.2.254:10900"
        }
        ...
    ]
}

日志异常:

[2019-03-29 16:59:30,924] INFO Unable to find fields [SinkRecordField{schema=Schema{INT32}, name='process_listners', isPrimaryKey=false}] among column names [employer_id, modified_on, modified_by_guid, contact_guid, target_item, guid, created_on, process_listeners, event_guid, created_by_guid, is_from_group, account_guid, event_response, note] (io.confluent.connect.jdbc.sink.DbStructure)
[2019-03-29 16:59:30,924] ERROR WorkerSinkTask{id=crm_data-sink_pandora-1} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.ConnectException: Table "pandora"."crm_event_participation" is missing fields ([SinkRecordField{schema=Schema{INT32}, name='process_listners', isPrimaryKey=false}]) and auto-evolution is disabled at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:140)
  at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:73)
  at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:84)
  at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:65)
  at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:73)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

请解释一下连接器配置中可能出现的问题?我使用kafka 2.0.0和jdbcsinkconnector 5.1.0。

hgncfbus

hgncfbus1#

在你的Kafka信息中有一个字段 process_listners . 表中不存在具有该名称的列。
我想你有打字错误。在表中有列 process_listeners ,不是 process_listners . errors.tolerance 属性仅应用于转换消息期间的错误。更多关于 errors.tolerance 您可以阅读:kafka connect-jdbc sink sql exception

相关问题