我有一个运行在合流云中的集群,能够使用其他应用程序生成和使用数据。但是,当我尝试连接雪花Kafka连接器时,我收到以下错误:
[2019-10-15 22:12:08,979] INFO Creating connector source-snowflake of type com.snowflake.kafka.connector.SnowflakeSinkConnector (org.apache.kafka.connect.runtime.Worker)
[2019-10-15 22:12:08,983] INFO Instantiated connector source-snowflake with version 0.5.1 of type class com.snowflake.kafka.connector.SnowflakeSinkConnector (org.apache.kafka.connect.runtime.Worker)
[2019-10-15 22:12:08,986] INFO
[SF_KAFKA_CONNECTOR] Snowflake Kafka Connector Version: 0.5.1 (com.snowflake.kafka.connector.Utils)
[2019-10-15 22:12:09,029] INFO
[SF_KAFKA_CONNECTOR] SnowflakeSinkConnector:start (com.snowflake.kafka.connector.SnowflakeSinkConnector)
[2019-10-15 22:12:09,030] ERROR
[SF_KAFKA_CONNECTOR] name is empty or invalid. It should match Snowflake object identifier syntax. Please see the documentation. (com.snowflake.kafka.connector.Utils)
[2019-10-15 22:12:09,033] ERROR WorkerConnector{id=source-snowflake} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector)
com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException:
[SF_KAFKA_CONNECTOR] Exception: Invalid input connector configuration
[SF_KAFKA_CONNECTOR] Error Code: 0001
[SF_KAFKA_CONNECTOR] Detail: input kafka connector configuration is null, missing required values, or wrong input value
at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:347)
at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:306)
at com.snowflake.kafka.connector.Utils.validateConfig(Utils.java:400)
at com.snowflake.kafka.connector.SnowflakeSinkConnector.start(SnowflakeSinkConnector.java:131)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)
at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)
at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:252)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1079)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:117)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1095)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1091)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
这是我的清理雪花配置文件:
{
"name":"snowsink",
"config":{
"connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max":"8",
"topics":"tp-snow-test",
"buffer.count.records":"100",
"buffer.flush.time":"60",
"buffer.size.bytes":"65536",
"snowflake.url.name":"xxxxxxx.east-us-2.azure.snowflakecomputing.com",
"snowflake.user.name":"svc_cc_strm",
"snowflake.private.key":"<key>",
"snowflake.private.key.passphrase":<password>,
"snowflake.database.name":"testdb",
"snowflake.schema.name":"test1",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"com.snowflake.kafka.connector.records.SnowflakeJsonConverter"
}
}
有什么想法吗?谢谢。
3条答案
按热度按时间lhcgjxsq1#
您需要更改连接器的名称(
source-snow
)移除-
从它(以便它匹配此验证模式)。?♂️
0qx6xfy62#
连接器的名称应该是snowflake的有效sql标识符。很多Kafka主题的例子中都有破折号,当我第一次尝试雪花Kafka连接器时,我也遇到了同样的错误。
根据文档,雪花管道是使用指定的连接器名称创建的,管道名称必须是有效的sql标识符。
连接器为每个主题分区创建一个管道。名称为:
雪花\uKafka\u连接器\u管道\u。
同样来自同一文档页的“配置文件中的字段”作为名称:
应用程序名称。这在客户使用的所有kafka连接器中必须是唯一的。此名称必须是有效的雪花无引号标识符。
如果主题中有破折号,则需要将其Map到一个表名,该表名也是连接器配置中正确的sql标识符,否则它将尝试创建与主题名相同的表名,并在名称中的“-”处失败。
yb3bgrhw3#
你需要在你的配置文件中有下面的条目,下面的主题条目。
“topics”:“tp雪地测试”,
“snowflake.topic2table.map”:“tp雪-test:testkafkatable",