mysql融合企业kafka数据摄取

z4bn682m  于 2021-06-06  发布在  Kafka
关注(0)|答案(3)|浏览(292)

我们有一个3节点的融合企业kafka集群(linux on prem),一个节点运行kafka connect服务。我们希望使用mysql将数据摄取到kafka主题中。
我试过跟踪-
1.在本地windows桌面上安装mysql,创建db,table,并在其中插入一些数据。
2.创建了一个 source-quickstart-mysql.properties 包含以下详细信息的文件-

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://<IPAddressOfLocalMachine>:3306/test_db?user=root&password=pwd
tables.whitelist=emp
mode=incrementing
incrementing.column.name=empid
topic.prefix=test-mysql-jdbc-

这个 connect-standalone.properties 有以下信息:

bootstrap.servers=IPaddressOfKCnode:9092
plugin.path=/usr/share/java

重新启动kafka连接服务
试图提交kafka连接服务请求以连接到我的sql- curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" IPaddressOfKCnode:8083/connectors/ -d '{"name": "emp-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.url": "jdbc:mysql://IPaddressOfLocalMachine:3306/test_db?user=root&password=pwd","table.whitelist": "emp","mode": "timestamp","topic.prefix": "mysql-" } }' 在此处获取以下错误:

{"error_code":400,"message":"Connector configuration is invalid and contains the following 2 error(s):\nInvalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=pwd for configuration Couldn't open connection to jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=pwd\nInvalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=admin for configuration Couldn't open connection to jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=pwd\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}

我也尝试过以下事情-
a、 已停止kafka connect服务并手动运行-

systemctl stop confluent-kafka-connect

b。像这样运行连接

/usr/bin/connect-standalone /etc/kafka/connect-standalone.properties /etc/kafka-connect-jdbc/source-quickstart-mysql.properties

这个过程一开始就成功开始了,但过了一段时间就结束了。以下是日志:

[2018-11-10 19:42:53,027] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:55)
[2018-11-10 19:42:53,048] INFO AbstractConfig values:
        batch.max.rows = 100
        catalog.pattern = null
        connection.attempts = 3
        connection.backoff.ms = 10000
        connection.password = null
        connection.url = jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=pwd
        connection.user = null
        dialect.name =
        incrementing.column.name = empid
        mode = incrementing
        numeric.mapping = null
        numeric.precision.mapping = false
        poll.interval.ms = 5000
        query =
        schema.pattern = null
        table.blacklist = []
        table.poll.interval.ms = 60000
        table.types = [TABLE]
        table.whitelist = []
        timestamp.column.name = []
        timestamp.delay.interval.ms = 0
        topic.prefix = test-mysql-jdbc-
        validate.non.null = true
 (org.apache.kafka.common.config.AbstractConfig:279)
[2018-11-10 19:45:00,439] INFO AbstractConfig values:
        batch.max.rows = 100
        catalog.pattern = null
        connection.attempts = 3
        connection.backoff.ms = 10000
        connection.password = null
        connection.url = jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=admin
        connection.user = null
        dialect.name =
        incrementing.column.name = empid
        mode = incrementing
        numeric.mapping = null
        numeric.precision.mapping = false
        poll.interval.ms = 5000
        query =
        schema.pattern = null
        table.blacklist = []
        table.poll.interval.ms = 60000
        table.types = [TABLE]
        table.whitelist = []
        timestamp.column.name = []
        timestamp.delay.interval.ms = 0
        topic.prefix = test-mysql-jdbc-
        validate.non.null = true
 (org.apache.kafka.common.config.AbstractConfig:279)
[2018-11-10 19:47:07,666] ERROR Failed to create job for /etc/kafka-connect-jdbc/source-quickstart-mysql.properties (org.apache.kafka.connect.cli.ConnectStandalone:102)
[2018-11-10 19:47:07,668] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:113)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://192.168.178.14:3306/test_db?user=root&password=admin
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://192.168.178.14:3306/test_db?user=root&password=admin
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
        at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79)
        at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66)
        at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:110)
Caused by: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://192.168.178.14:3306/test_db?user=root&password=admin
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://192.168.178.14:3306/test_db?user=root&password=admin
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
        at org.apache.kafka.connect.runtime.AbstractHerder.maybeAddConfigErrors(AbstractHerder.java:415)
        at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:189)
        at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
[2018-11-10 19:47:07,669] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:65)

我在confluent网站上找不到一个平滑和适当的文档来使用kafka connect服务的各种连接器、配置等。请帮助获得实现数据摄取管道的正确步骤:mysql--kafkaconnect--kafka
最后,我希望mysql表中的insert在kafka主题中生成数据,kafka使用者将显示这些记录。这种摄取似乎很简单,但我遗漏了一些基本的连接属性:(
谢谢!

ymzxtsji

ymzxtsji1#

在第一个示例中,您的错误是从 curl 命令输出:
Connector configuration is invalid and contains the following 2 error(s) java.sql.SQLException: No suitable driver found for jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=pwd 因此,您在kafka connect路径中丢失了用于mysql的jdbc驱动程序。
第二个错误在您发布的输出中: Connector configuration is invalid and contains the following 2 error(s): Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure. The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://192.168.178.14:3306/test_db?user=root&password=admin 这表明kafka connect在访问mysql机器时出现了问题。
你从哪里运行confluent平台,它在docker中,mysql的本地机器中,等等?是 192.168.178.14 您的mysql服务器的地址,以及可以从运行kafka connect的主机访问它吗?
您可以找到几个使用kafka设置mysql的示例:
https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1/ -正如您所做的那样,它使用jdbc连接器
https://rmoff.net/2018/03/24/streaming-data-from-mysql-into-kafka-with-kafka-connect-and-debezium/ -这使用了debezium,一个基于日志的cdc工具
有关jdbc连接器与基于日志的cdc的优缺点,请参见https://www.confluent.io/blog/no-more-silos-how-to-integrate-your-databases-with-apache-kafka-and-cdc.
免责声明:我写了上述博客文章。

w9apscun

w9apscun2#

感谢robin和giorgos的回答!这帮了大忙。这个问题与以下几点有关-1。缺少mysql jdbc连接器jar。我们必须将mysql connector/j8.0.13放在 /usr/share/java/kafka-connect-jdbc/ .
2.导致连接问题的原因是kafka connect尝试连接的mysql用户没有足够的权限连接到远程连接服务。为此,我创建了一个新的mysql用户,拥有对远程服务器的完全权限和访问权限(kafka connect)。
完成上述步骤后,重新启动Kafka连接,摄入管道开始工作。

xkftehaa

xkftehaa3#

这似乎是jdbc连接器的问题。你在运行哪个mysql版本?要解决此问题,您需要:
如果您运行的是MySQL8或5.1.47旧版本,请下载connector/j 8.0.13。
将jar文件放在 /usr/share/java/kafka-connect-jdbc/ .
重新启动kafka connect并启动mysql连接器。

相关问题