kafka jdbc mysql源连接器是否需要在localhost上安装mysql服务器?

qco9c6ql  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(351)

我对kafka还很陌生,我正在尝试用一个mysql源连接器和一个elasticsearch+elasticsearch sink连接器建立一个简单的kafka连接系统并运行;用于基本数据流目的。
我是按照https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1/ &它的第2部分(我已经验证了es的工作原理是在源代码端有一个简单的生产者。)
除了mysql源连接器之外,其他一切都按预期配置和工作。我尝试这一切的虚拟机上没有安装mysql服务器。教程的dbms部分我使用客户机来创建/更改和处理表。因此,在源属性中,我尝试:

"connection.url": "jdbc:mysql://IPaddressofDB:3306/DBname?user=uname&password=pwd"
"table.whitelist": "tablename"

为了启动连接器,我只做了一个 ./confluent load connector-name 当我加载源连接器并检查其状态时,它会给出一个错误

"org.apache.kafka.connect.errors.ConnectException: Failed trying to validate that columns used for offsets are NOT NULL\n\t ...
 Caused by: java.sql.SQLSyntaxErrorException: Table 'admin_portal.tablename' doesn't exist\n\t

这是对的吗?我完全错过了什么吗?
如何为我正在尝试的情况指定connection.url:您正在尝试连接到不同的db服务器?几乎所有的例子/git问题等似乎都只指定localhost。
我不知道在哪里 admin_portal 是从哪里来的,我根本没说

****为@robin moffat的建议编辑(似乎给出了与以前相同的错误)

sourceconfig.json文件:

{
        "name": "jdbc_source_mysql_new",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                "connection.url": "jdbc:mysql://ipaddress:3306/dbname?user=uname&password=pwd",
                "table.whitelist": "dbname.tablename",
                "topic.prefix": "mysql-new-",
                "mode":"incrementing",
                "incrementing.column.name": "colname"
                }
}

已加载连接器:

>curl -X POST -H "Content-Type: application/json" --data @sourceconfig.json http://localhost:8083/connectors

检查接头的状态:

>curl -X GET localhost:8083/connectors/jdbc_source_mysql_new/tasks/0/status

  {"state":"FAILED","
     "trace": 
     "org.apache.kafka.connect.errors.ConnectException: Failed trying to validate that columns used for offsets are NOT NULL\n\t
     at io.confluent.connect.jdbc.source.JdbcSourceTask.validateNonNullable(JdbcSourceTask.java:400)\n\t
     at io.confluent.connect.jdbc.source.JdbcSourceTask.start(JdbcSourceTask.java:156)\n\t
     at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)\n\t
     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\t
     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\t
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\t
     at java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\t
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\t
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\t
 at java.lang.Thread.run(Thread.java:748)\n

 Caused by: java.sql.SQLSyntaxErrorException: Table 'admin_portal.tablename' doesn't exist\n\t
 at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)\n\t
 at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)\n\t
 at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)\n\t
 at com.mysql.cj.jdbc.StatementImpl.executeQuery(StatementImpl.java:1218)\n\t
 at com.mysql.cj.jdbc.DatabaseMetaData$7.forEach(DatabaseMetaData.java:2950)\n\t
 at com.mysql.cj.jdbc.DatabaseMetaData$7.forEach(DatabaseMetaData.java:2938)\n\t
 at com.mysql.cj.jdbc.IterateBlock.doForAll(IterateBlock.java:56)\n\t
 at com.mysql.cj.jdbc.DatabaseMetaData.getPrimaryKeys(DatabaseMetaData.java:2991)\n\t
 at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.primaryKeyColumns(GenericDatabaseDialect.java:696)\n\t
 at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.describeColumns(GenericDatabaseDialect.java:533)\n\t
 at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.describeColumns(GenericDatabaseDialect.java:513)\n\t
 at io.confluent.connect.jdbc.source.JdbcSourceTask.validateNonNullable(JdbcSourceTask.java:369)\n\t... 9 more\n",}
qnyhuwrf

qnyhuwrf1#

在我将我的sql连接器版本从8.x降到5.1.47并将其放在正确的$classpath中之后,它就工作了

mysql-connector-java-5.1.47.jar
bvjxkvbb

bvjxkvbb2#

kafka jdbc mysql源连接器是否需要在localhost上安装mysql服务器?
不,它使用jdbc,可以连接到远程示例上的服务器。
这是对的吗?我完全错过了什么吗?
从你所描述的情况来看,你是对的:)
如何为我正在尝试的情况指定connection.url:您正在尝试连接到不同的db服务器?几乎所有的例子/git问题等似乎都只指定localhost。
你可以在这里看到一个例子
您需要正确配置jdbc url,mysql的语法可以在这里找到。
我不知道管理员门户是从哪里来的,我根本没有指定任何地方
这将取决于与您连接到数据库的用户的权限。您需要确保它可以访问要从中读取数据的表。您还可以限定表名,例如。

"table.whitelist": "schema.tablename"

相关问题