嗨,我正在尝试将远程sql server上的所有表导入到ksql主题这是我的文件属性
connector.class=io.confluent.connect.cdc.mssql.MsSqlSourceConnector
name=sqlservertest
tasks.max=1
initial.database=$$DATABASE
connection.url=jdbc:sqlserver://$$IP:1433;databaseName=$$DATABASE;user=$$USER;
username=$$USER
password=$$PASS
server.name=$$IP
server.port=1433
topic.prefix=sqlservertest
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
mode=bulk
auto.create=true
auto.evolve=true
比我还厉害
confluent load sqlservertest -d /opt/kakfkaconf/sqlservertest.properties
在日志里
confluent log connect -f
它显示了
[2018-10-10 14:18:43,856] INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:868)
它运行正常,但不导入任何内容,主题保持为空
confluent status sqlservertest
{
"name": "sqlservertest",
"connector": {
"state": "RUNNING",
"worker_id": "10.132.0.2:8083"
},
"tasks": [],
"type": "source"
}
我也已经把房子老化了
name=mssql
connector.class=io.confluent.connect.cdc.mssql.MsSqlSourceConnector
tasks.max=2
initial.database=$$DB
username=$$USER
password=$$PASS
server.name=$$IP
server.port=1433
change.tracking.tables=$$SCHEMA.$$TABLE
auto.create=true
auto.evolve=true
topic.prefix=$$DB
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
但我得到了这个错误
[2018-10-10 15:06:09,216] ERROR Exception thrown while querying for ChangeKey{databaseName=$$DB, schemaName=$$SCHEMA, tableName=$$TABLE} (io.confluent.connect.cdc.mssql.QueryService:94)
org.apache.kafka.connect.errors.DataException: Exception thrown while getting metadata for ChangeKey{databaseName=$$DB, schemaName=$$SCHEMA, tableName=$$TABLE}
at io.confluent.connect.cdc.CachingTableMetadataProvider.tableMetadata(CachingTableMetadataProvider.java:64)
at io.confluent.connect.cdc.mssql.QueryService.queryTable(QueryService.java:108)
at io.confluent.connect.cdc.mssql.QueryService.processTables(QueryService.java:92)
at io.confluent.connect.cdc.mssql.QueryService.run(QueryService.java:67)
at com.google.common.util.concurrent.AbstractExecutionThreadService$1$2.run(AbstractExecutionThreadService.java:60)
at com.google.common.util.concurrent.Callables$3.run(Callables.java:95)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: com.microsoft.sqlserver.jdbc.SQLServerException: Incorrect syntax near '='.
at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
... 6 more
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: Incorrect syntax near '='.
at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:259)
at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1547)
... 11 more
1条答案
按热度按时间u4vypkhs1#
我发现了这个错误的真正原因,kafka连接器使用的是mssqlserver2012中的函数,特别是函数中的iff和boolan比较
不适用于ms sql 2008的
真正的原因是Conflune mssql连接器是专为ms sql server 2012和更高版本,我正在运行版本2008
我反编译了kafka connect cdc mssql库,并将sql代码调整为与sqlserver2008兼容,现在它可以工作了。
也许我会把它推到github上,让每个人都可以使用它