kafka jdbc源连接器插入或更新

quhf5bfb  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(395)

我配置了一个kafka jdbc源连接器,以便将postgresql数据库中更改的记录(插入或更新)推送到kafka主题上。我使用“时间戳+递增”模式。似乎还不错。我没有配置jdbc接收器连接器,因为我使用的是一个kafka使用者,它可以监听这个主题。
主题上的消息是json。这是一个例子:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int64",
        "optional": false,
        "field": "id"
      },
      {
        "type": "int64",
        "optional": true,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "entity_create_date"
      },
      {
        "type": "int64",
        "optional": true,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "entity_modify_date"
      },
      {
        "type": "int32",
        "optional": true,
        "field": "entity_version"
      },
      {
        "type": "string",
        "optional": true,
        "field": "firstname"
      },
      {
        "type": "string",
        "optional": true,
        "field": "lastname"
      }
    ],
    "optional": false,
    "name": "author"
  },
  "payload": {
    "id": 1,
    "entity_create_date": 1600287236682,
    "entity_modify_date": 1600287236682,
    "entity_version": 1,
    "firstname": "George",
    "lastname": "Orwell"
  }
}

如您所见,没有关于源连接器是否因为插入或更新而捕获此更改的信息。我需要这个信息。如何解决?

62o28rlo

62o28rlo1#

您不能使用jdbc源连接器获得这些信息,除非您在源模式和触发器中定制了一些东西。
这就是为什么基于日志的cdc通常是从源数据库获取事件的更好方法的原因之一,其他原因包括:
捕获删除
捕获操作类型
捕获所有事件,而不仅仅是连接器轮询时的事件。
更多关于这个细节的细节,请看这个博客或基于这个博客的演讲。

lfapxunr

lfapxunr2#

使用@robin moffatt建议的基于cdc的方法可能是处理您的需求的正确方法。结帐https://debezium.io/
但是,查看表数据时,可以在使用者中使用“entity\u create\u date”和“entity\u modify\u date”来确定消息是否在insert或update中。如果“entity\u create\u date”=“entity\u modify\u date”,则它是一个插入,否则它是一个更新。

相关问题