我对kafka还很陌生,我正在尝试用一个mysql源连接器和一个elasticsearch+elasticsearch sink连接器建立一个简单的kafka连接系统并运行;用于基本数据流目的。
我是按照https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1/ &它的第2部分(我已经验证了es的工作原理是在源代码端有一个简单的生产者。)
除了mysql源连接器之外,其他一切都按预期配置和工作。我尝试这一切的虚拟机上没有安装mysql服务器。教程的dbms部分我使用客户机来创建/更改和处理表。因此,在源属性中,我尝试:
"connection.url": "jdbc:mysql://IPaddressofDB:3306/DBname?user=uname&password=pwd"
"table.whitelist": "tablename"
为了启动连接器,我只做了一个 ./confluent load connector-name
当我加载源连接器并检查其状态时,它会给出一个错误
"org.apache.kafka.connect.errors.ConnectException: Failed trying to validate that columns used for offsets are NOT NULL\n\t ...
Caused by: java.sql.SQLSyntaxErrorException: Table 'admin_portal.tablename' doesn't exist\n\t
这是对的吗?我完全错过了什么吗?
如何为我正在尝试的情况指定connection.url:您正在尝试连接到不同的db服务器?几乎所有的例子/git问题等似乎都只指定localhost。
我不知道在哪里 admin_portal
是从哪里来的,我根本没说
****为@robin moffat的建议编辑(似乎给出了与以前相同的错误)
sourceconfig.json文件:
{
"name": "jdbc_source_mysql_new",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://ipaddress:3306/dbname?user=uname&password=pwd",
"table.whitelist": "dbname.tablename",
"topic.prefix": "mysql-new-",
"mode":"incrementing",
"incrementing.column.name": "colname"
}
}
已加载连接器:
>curl -X POST -H "Content-Type: application/json" --data @sourceconfig.json http://localhost:8083/connectors
检查接头的状态:
>curl -X GET localhost:8083/connectors/jdbc_source_mysql_new/tasks/0/status
{"state":"FAILED","
"trace":
"org.apache.kafka.connect.errors.ConnectException: Failed trying to validate that columns used for offsets are NOT NULL\n\t
at io.confluent.connect.jdbc.source.JdbcSourceTask.validateNonNullable(JdbcSourceTask.java:400)\n\t
at io.confluent.connect.jdbc.source.JdbcSourceTask.start(JdbcSourceTask.java:156)\n\t
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)\n\t
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\t
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\t
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\t
at java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\t
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\t
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\t
at java.lang.Thread.run(Thread.java:748)\n
Caused by: java.sql.SQLSyntaxErrorException: Table 'admin_portal.tablename' doesn't exist\n\t
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)\n\t
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)\n\t
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)\n\t
at com.mysql.cj.jdbc.StatementImpl.executeQuery(StatementImpl.java:1218)\n\t
at com.mysql.cj.jdbc.DatabaseMetaData$7.forEach(DatabaseMetaData.java:2950)\n\t
at com.mysql.cj.jdbc.DatabaseMetaData$7.forEach(DatabaseMetaData.java:2938)\n\t
at com.mysql.cj.jdbc.IterateBlock.doForAll(IterateBlock.java:56)\n\t
at com.mysql.cj.jdbc.DatabaseMetaData.getPrimaryKeys(DatabaseMetaData.java:2991)\n\t
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.primaryKeyColumns(GenericDatabaseDialect.java:696)\n\t
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.describeColumns(GenericDatabaseDialect.java:533)\n\t
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.describeColumns(GenericDatabaseDialect.java:513)\n\t
at io.confluent.connect.jdbc.source.JdbcSourceTask.validateNonNullable(JdbcSourceTask.java:369)\n\t... 9 more\n",}
2条答案
按热度按时间qnyhuwrf1#
在我将我的sql连接器版本从8.x降到5.1.47并将其放在正确的$classpath中之后,它就工作了
bvjxkvbb2#
kafka jdbc mysql源连接器是否需要在localhost上安装mysql服务器?
不,它使用jdbc,可以连接到远程示例上的服务器。
这是对的吗?我完全错过了什么吗?
从你所描述的情况来看,你是对的:)
如何为我正在尝试的情况指定connection.url:您正在尝试连接到不同的db服务器?几乎所有的例子/git问题等似乎都只指定localhost。
你可以在这里看到一个例子
您需要正确配置jdbc url,mysql的语法可以在这里找到。
我不知道管理员门户是从哪里来的,我根本没有指定任何地方
这将取决于与您连接到数据库的用户的权限。您需要确保它可以访问要从中读取数据的表。您还可以限定表名,例如。