如何在ApacheKafka中从远程数据库中提取数据?

qnyhuwrf  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(680)

我想用apachekafka制作实时数据管道。我有一个数据库,它位于远程位置,该数据库不断更新。有谁知道我应该用哪个kafkaconnectapi来从数据库中提取数据并实时地将数据导入kafka代理?稍后,我将使用kafka流和ksql运行特殊查询来执行度量。
任何帮助都将不胜感激!

izj3ouym

izj3ouym1#

如果您想创建一个实时数据管道,您需要使用一个变更数据捕获(cdc)工具,它能够从mysql流式传输变更。我建议使用debezium,它是一个用于捕获变更数据的开源分布式平台。
捕获插入
向表中添加新记录时,将生成类似于以下内容的json:

{  
   "payload":{  
      "before":null,
      "after":{  
         "id":1005,
         "first_name":"Giorgos",
         "last_name":"Myrianthous",
         "email":"giorgos@abc.com"
      },
      "source":{  
         "name":"dbserver1",
         "server_id":223344,
         "ts_sec":1500369632,
         "gtid":null,
         "file":"mysql-bin.000003",
         "pos":364,
         "row":0,
         "snapshot":null,
         "thread":13,
         "db":"inventory",
         "table":"customers"
      },
      "op":"c",
      "ts_ms":1500369632095
   }
}
``` `before` 对象为空且 `after` 对象包含新插入的值。请注意 `op` 属性为 `c` ,表示这是一个创建事件。
捕获更新
假设 `email` 属性已更新,将生成类似于以下内容的json:

{
"payload":{
"before":{
"id":1005,
"first_name":"Giorgos",
"last_name":"Myrianthous",
"email":"giorgos@abc.com"
},
"after":{
"id":1005,
"first_name":"Giorgos",
"last_name":"Myrianthous",
"email":"newEmail@abc.com"
},
"source":{
"name":"dbserver1",
"server_id":223344,
"ts_sec":1500369929,
"gtid":null,
"file":"mysql-bin.000003",
"pos":673,
"row":0,
"snapshot":null,
"thread":13,
"db":"inventory",
"table":"customers"
},
"op":"u",
"ts_ms":1500369929464
}
}

通知 `op` 这就是现在 `u` ,表示这是一个更新事件。 `before` 对象显示更新和 `after` 对象捕获更新行的当前状态。
捕获删除
现在假设行已被删除;

{
"payload":{
"before":{
"id":1005,
"first_name":"Giorgos",
"last_name":"Myrianthous",
"email":"newEmail@abc.com"
},
"after":null,
"source":{
"name":"dbserver1",
"server_id":223344,
"ts_sec":1500370394,
"gtid":null,
"file":"mysql-bin.000003",
"pos":1025,
"row":0,
"snapshot":null,
"thread":13,
"db":"inventory",
"table":"customers"
},
"op":"d",
"ts_ms":1500370394589
}
}
``` op 新的等于 d ,表示删除事件。 after 属性将为null,并且 before 对象包含删除前的行。
你也可以看看他们网站上提供的大量教程。
edit:example configuration 对于mysql数据库

{
  "name": "inventory-connector",  (1)
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector", (2)
    "database.hostname": "192.168.99.100", (3)
    "database.port": "3306", (4)
    "database.user": "debezium", (5)
    "database.password": "dbz", (6)
    "database.server.id": "184054", (7)
    "database.server.name": "fullfillment", (8)
    "database.whitelist": "inventory", (9)
    "database.history.kafka.bootstrap.servers": "kafka:9092", (10)
    "database.history.kafka.topic": "dbhistory.fullfillment" (11)
    "include.schema.changes": "true" (12)
  }
}

1在kafka connect服务中注册连接器时的名称。
2此mysql连接器类的名称。
3 mysql服务器的地址。
4 mysql服务器的端口号。
5具有所需权限的mysql用户的名称。
6具有所需权限的mysql用户的密码。
7连接器的标识符,在mysql集群中必须是唯一的,并且类似于mysql的服务器id配置属性。
8 mysql服务器/集群的逻辑名称,形成一个名称空间,用于连接器写入的所有Kafka主题的名称、Kafka连接模式名称以及使用avro连接器时对应的avro模式的名称空间。
9此连接器将监视的此服务器托管的所有数据库的列表。这是可选的,并且还有其他属性用于列出要从监视中包括或排除的数据库和表。
10此连接器将用于将ddl语句写入和恢复到数据库历史主题的kafka代理的列表。
11连接器将在其中写入和恢复ddl语句的数据库历史记录主题的名称。本主题仅供内部使用,不应由消费者使用。
12指定连接器应在名为fullfillment events的架构更改主题上生成的标志,其中包含可由使用者使用的ddl更改。

hk8txs48

hk8txs482#

如果您是从mysql数据库读取数据,请使用confluent的jdbc源连接器。https://github.com/confluentinc/kafka-connect-jdbc/ 您还需要下载mysql驱动程序并将其与kafka jars放在一起:https://dev.mysql.com/downloads/connector/j/5.1.html

相关问题