我想用apachekafka制作实时数据管道。我有一个数据库,它位于远程位置,该数据库不断更新。有谁知道我应该用哪个kafkaconnectapi来从数据库中提取数据并实时地将数据导入kafka代理?稍后,我将使用kafka流和ksql运行特殊查询来执行度量。任何帮助都将不胜感激!
izj3ouym1#
如果您想创建一个实时数据管道,您需要使用一个变更数据捕获(cdc)工具,它能够从mysql流式传输变更。我建议使用debezium,它是一个用于捕获变更数据的开源分布式平台。捕获插入向表中添加新记录时,将生成类似于以下内容的json:
{ "payload":{ "before":null, "after":{ "id":1005, "first_name":"Giorgos", "last_name":"Myrianthous", "email":"giorgos@abc.com" }, "source":{ "name":"dbserver1", "server_id":223344, "ts_sec":1500369632, "gtid":null, "file":"mysql-bin.000003", "pos":364, "row":0, "snapshot":null, "thread":13, "db":"inventory", "table":"customers" }, "op":"c", "ts_ms":1500369632095 } } ``` `before` 对象为空且 `after` 对象包含新插入的值。请注意 `op` 属性为 `c` ,表示这是一个创建事件。 捕获更新 假设 `email` 属性已更新,将生成类似于以下内容的json:
{"payload":{ "before":{ "id":1005,"first_name":"Giorgos","last_name":"Myrianthous","email":"giorgos@abc.com"},"after":{ "id":1005,"first_name":"Giorgos","last_name":"Myrianthous","email":"newEmail@abc.com"},"source":{ "name":"dbserver1","server_id":223344,"ts_sec":1500369929,"gtid":null,"file":"mysql-bin.000003","pos":673,"row":0,"snapshot":null,"thread":13,"db":"inventory","table":"customers"},"op":"u","ts_ms":1500369929464}}
通知 `op` 这就是现在 `u` ,表示这是一个更新事件。 `before` 对象显示更新和 `after` 对象捕获更新行的当前状态。 捕获删除 现在假设行已被删除;
{"payload":{ "before":{ "id":1005,"first_name":"Giorgos","last_name":"Myrianthous","email":"newEmail@abc.com"},"after":null,"source":{ "name":"dbserver1","server_id":223344,"ts_sec":1500370394,"gtid":null,"file":"mysql-bin.000003","pos":1025,"row":0,"snapshot":null,"thread":13,"db":"inventory","table":"customers"},"op":"d","ts_ms":1500370394589}}``` op 新的等于 d ,表示删除事件。 after 属性将为null,并且 before 对象包含删除前的行。你也可以看看他们网站上提供的大量教程。edit:example configuration 对于mysql数据库
op
d
after
before
{ "name": "inventory-connector", (1) "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", (2) "database.hostname": "192.168.99.100", (3) "database.port": "3306", (4) "database.user": "debezium", (5) "database.password": "dbz", (6) "database.server.id": "184054", (7) "database.server.name": "fullfillment", (8) "database.whitelist": "inventory", (9) "database.history.kafka.bootstrap.servers": "kafka:9092", (10) "database.history.kafka.topic": "dbhistory.fullfillment" (11) "include.schema.changes": "true" (12) } }
1在kafka connect服务中注册连接器时的名称。2此mysql连接器类的名称。3 mysql服务器的地址。4 mysql服务器的端口号。5具有所需权限的mysql用户的名称。6具有所需权限的mysql用户的密码。7连接器的标识符,在mysql集群中必须是唯一的,并且类似于mysql的服务器id配置属性。8 mysql服务器/集群的逻辑名称,形成一个名称空间,用于连接器写入的所有Kafka主题的名称、Kafka连接模式名称以及使用avro连接器时对应的avro模式的名称空间。9此连接器将监视的此服务器托管的所有数据库的列表。这是可选的,并且还有其他属性用于列出要从监视中包括或排除的数据库和表。10此连接器将用于将ddl语句写入和恢复到数据库历史主题的kafka代理的列表。11连接器将在其中写入和恢复ddl语句的数据库历史记录主题的名称。本主题仅供内部使用,不应由消费者使用。12指定连接器应在名为fullfillment events的架构更改主题上生成的标志,其中包含可由使用者使用的ddl更改。
hk8txs482#
如果您是从mysql数据库读取数据,请使用confluent的jdbc源连接器。https://github.com/confluentinc/kafka-connect-jdbc/ 您还需要下载mysql驱动程序并将其与kafka jars放在一起:https://dev.mysql.com/downloads/connector/j/5.1.html
2条答案
按热度按时间izj3ouym1#
如果您想创建一个实时数据管道,您需要使用一个变更数据捕获(cdc)工具,它能够从mysql流式传输变更。我建议使用debezium,它是一个用于捕获变更数据的开源分布式平台。
捕获插入
向表中添加新记录时,将生成类似于以下内容的json:
{
"payload":{
"before":{
"id":1005,
"first_name":"Giorgos",
"last_name":"Myrianthous",
"email":"giorgos@abc.com"
},
"after":{
"id":1005,
"first_name":"Giorgos",
"last_name":"Myrianthous",
"email":"newEmail@abc.com"
},
"source":{
"name":"dbserver1",
"server_id":223344,
"ts_sec":1500369929,
"gtid":null,
"file":"mysql-bin.000003",
"pos":673,
"row":0,
"snapshot":null,
"thread":13,
"db":"inventory",
"table":"customers"
},
"op":"u",
"ts_ms":1500369929464
}
}
{
"payload":{
"before":{
"id":1005,
"first_name":"Giorgos",
"last_name":"Myrianthous",
"email":"newEmail@abc.com"
},
"after":null,
"source":{
"name":"dbserver1",
"server_id":223344,
"ts_sec":1500370394,
"gtid":null,
"file":"mysql-bin.000003",
"pos":1025,
"row":0,
"snapshot":null,
"thread":13,
"db":"inventory",
"table":"customers"
},
"op":"d",
"ts_ms":1500370394589
}
}
```
op
新的等于d
,表示删除事件。after
属性将为null,并且before
对象包含删除前的行。你也可以看看他们网站上提供的大量教程。
edit:example configuration 对于mysql数据库
1在kafka connect服务中注册连接器时的名称。
2此mysql连接器类的名称。
3 mysql服务器的地址。
4 mysql服务器的端口号。
5具有所需权限的mysql用户的名称。
6具有所需权限的mysql用户的密码。
7连接器的标识符,在mysql集群中必须是唯一的,并且类似于mysql的服务器id配置属性。
8 mysql服务器/集群的逻辑名称,形成一个名称空间,用于连接器写入的所有Kafka主题的名称、Kafka连接模式名称以及使用avro连接器时对应的avro模式的名称空间。
9此连接器将监视的此服务器托管的所有数据库的列表。这是可选的,并且还有其他属性用于列出要从监视中包括或排除的数据库和表。
10此连接器将用于将ddl语句写入和恢复到数据库历史主题的kafka代理的列表。
11连接器将在其中写入和恢复ddl语句的数据库历史记录主题的名称。本主题仅供内部使用,不应由消费者使用。
12指定连接器应在名为fullfillment events的架构更改主题上生成的标志,其中包含可由使用者使用的ddl更改。
hk8txs482#
如果您是从mysql数据库读取数据,请使用confluent的jdbc源连接器。https://github.com/confluentinc/kafka-connect-jdbc/ 您还需要下载mysql驱动程序并将其与kafka jars放在一起:https://dev.mysql.com/downloads/connector/j/5.1.html