如何将kafka连接到mysql?

wztqucjr  于 2021-06-06  发布在  Kafka
关注(0)|答案(3)|浏览(535)

我正在尝试将kafka与windows上的mysql连接起来。我没有使用合流。我的Kafka版本是2.12我已经开始Zookeeper,Kafka,生产者和消费者,这一切都很好。
我的mysql版本是8.0.15
我在libs文件夹中复制了这3个jar文件

mysql-connector-java-8.0.15.jar
mysql-connector-java-5.1.47.jar
mysql-connector-java-5.1.47-bin.jar

我的source-quickstart-mysql.properties文件代码是

name=test-source-mysql-jdbc-autoincrement        connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/databasename? 
user=rootname&password=password
mode=incrementing
incrementing.column.name=ID
topic.prefix=my-replicated-topic-table1

当我运行命令时

connect-standalone.bat ..\..\config\connect-standalone.properties  ..\..\config\source-quickstart-mysql.properties

我在控制台上遇到这个错误
[2019-03-26 16:16:39524]为..\config\source-quickstart-mysql.properties(org.apache.kafka.connect.cli.connectstandalone)创建作业失败[2019-03-26 16:16:39,524]连接器错误(org.apache.kafka.connect.cli.connectstandalone)java.util.concurrent.executionexception:org.apache.kafka.connect.errors.connectexception:找不到实现连接器且名称与io.confluent.connect.jdbc.jdbcsourc econnector匹配的任何类,可用的连接器有:plugindesc{klass=class org.apache.kafka.connect.file.filestreamsinkconnector,name='org.apache.kafka.connect.file.filestreamsinkconnector',version='2.1.0',encod edversion=2.1.0,type=sink,typename='sink',location='classpath'},plugindesc{klass class org.apache.kafka.connect.file.filestreamsourceconnector,name='org.apache.kafka.connect.file.filestreamsource connector',version='2.1.0',encodedversion=2.1.0,type=source,typename='source',location='classpath'},plugindesc{klass=class org.apache.kafka.connect.tools.mockconnector,name='org.apache.kafka.co connect.tools.mockconnector',version='2.1.0',encodedversion=2.1.0,type=connector,typename='connector',location='classpath'},plugindesc{klass=class org.apache.kafka.connect.tools.mocksinkconnector,name='org.apache.kafka.connect.tools.mocksinkconnector',version='2.1.0',encodedversion=2.1.0,type=sink,typename='sink',location='classpath'},plugindesc{klass class org.apache.kafka.connect.tool s.mocksourceconnector,name='org.apache.kafka.connect.tools.mocksourceconnector',version='2.1.0',encodedversion=2.1.0,type=source,typename='source',location='classpath'},plugindesc{klass=class o rg.apache.kafka.connect.tools.schemasourceconnector,name='org.apache.kafka.connect.tools.schemasourceconnector',version='2.1.0',encodedversion=2.1.0,type=source,typename='source',location='class path'},plugindesc{klass=class org.apache.kafka.connect.tools.verifiablesinkconnector,name='org.apache.kafka.connect.tools.verifiablesinkconnector',version='2.1.0',encodedversion=2.1.0,type=source,typename='source',location='classpath'},plugindesc{klass=class org.apache.kafka.connect.tools.verifiablesourceconnector,name='org.apache.kafka.connect.tools.verifiablesourceconnector',version='2.1.0',encodedversion=2.1.0,type=source,typename='source',location='classpath'}位于org.apache.kafka.connect.util.convertingfuturecallback.result(convertingfuturecallback)。java:79)在org.apache.kafka.connect.util.convertingfuturecallback.get(convertingfuturecallback。java:66)位于org.apache.kafka.connect.cli.connectstandalone.main(connectstandalone.com)。java:110)原因:org.apache.kafka.connect.errors.connectexception:找不到任何实现连接器且名称与io.confluent.connect.jdbc.jdbcsourceconnector匹配的类,可用的连接器有:plugindesc{klass=class org.apache.kafka.connect.file.filestreamsinkconnector,name='org.apache.kafka.connect.file.filestreamsinkconnector',version='2.1.0',encodedversion=2.1.0,type=sink,ty pename='sink',location='classpath'},plugindesc{klass=class org.apache.kafka.connect.file.filestreamsourceconnector,name='org.apache.kafka.connect.file.filestreamsourceconnector',version='2.1.0',e ncodedversion=2.1.0,type=source,typename='source',location='classpath'},plugindesc{klass=class org.apache.kafka.connect.tools.mockconnector,name='org.apache.kafka.connect.tools.mockconnector',ve rsion='2.1.0',encodedversion=2.1.0,type=connector,typename='connector',location='classpath'},plugindesc{klass class org.apache.kafka.connect.tools.mocksinkconnector,name='org.apache.kafka.connec t.tools.mocksinkconnector',version='2.1.0',encodedversion=2.1.0,type=sink,typename='sink',location='classpath'},plugindesc{klass=class org.apache.kafka.connect.tools.mocksourceconnector,name='o rg.apache.kafka.connect.tools.mocksourceconnector',version='2.1.0',encodedversion=2.1.0,type=source,typename='source',location='classpath'},plugindesc{klass=class org.apache.kafka.connect.tools。schemasourceconnector,name='org.apache.kafka.connect.tools.schemasourceconnector',version='2.1.0',encodedversion=2.1.0,type=source,typename='source',location='classpath'},plugindesc{klass=class org.apache.kafka.connect.tools.verifiablesinkconnector,name='org.apache.kafka.connect.tools.verifiablesinkconnector',version='2.1.0',encodedversion=2.1.0,type=source,typename='source',location='classpath'},plugindesc{klass=class org.apache.kafka.connect.tools.verifiablesourceconnector,name='org.apache.kafka.connect.tools.verifiablesourceconnector',version='2.1.0',encodedversion=2.1.0,t type=source,typename='source',location='classpath'}位于org.apache.kafka.connect.runtime.isolation.plugins.newconnector(插件)。java:179)在org.apache.kafka.connect.runtime.abstractherder.getconnector(abstractherder.com)上。java:382)位于org.apache.kafka.connect.runtime.abstractherder.validateconnectorconfig(abstractherder.com)。java:261)在org.apache.kafka.connect.runtime.standaloneherder.putconnectorconfig(standaloneherder。java:189)位于org.apache.kafka.connect.cli.connectstandalone.main(connectstandalone.com)。java:107)
请帮帮我。
我也尝试过这篇文章,但是没有使用命令bin/confluent load jdbc source-djdbc-source.properties生成输出https://supergloo.com/kafka-connect/kafka-connect-mysql-example/

qgzx9mmu

qgzx9mmu1#

你的错误是

org.apache.kafka.connect.errors.ConnectException: Failed to find any class that 
 implements Connector and which name matches io.confluent.connect.jdbc.JdbcSourceConnector

既然您说您没有使用confluent平台,这是有道理的,因为kafka connect jdbc不是ApacheKafka的一部分。您可以使用confluent平台,从源代码构建连接器,或者在http://hub.confluent.io.

r8uurelv

r8uurelv2#

如果在kafka lib path中添加kafka-connect-jdbc-5.5.1.jar并重新启动kafka zookeeper和服务器。你应该能够连接。

uqxowvwt

uqxowvwt3#

您可以下载kafka connect jdbchttps://www.confluent.io/hub/confluentinc/kafka-connect-jdbc 它是自由的,没有汇合平台。解压后,更新confluentinc-kafka-connect-jdbc-5.5.1.jar的位置,以获取connect-standalone.properties中的key plugin.path,该路径位于已安装的apache kafka的config目录中。再次运行脚本后,错误将消失。

相关问题