sf\u kafka\u连接器名称为空或使用合流云和雪花kafka连接器时出错

a0x5cqrl  于 2021-06-04  发布在  Kafka
关注(0)|答案(3)|浏览(286)

我有一个运行在合流云中的集群,能够使用其他应用程序生成和使用数据。但是,当我尝试连接雪花Kafka连接器时,我收到以下错误:

[2019-10-15 22:12:08,979] INFO Creating connector source-snowflake of type com.snowflake.kafka.connector.SnowflakeSinkConnector (org.apache.kafka.connect.runtime.Worker)
[2019-10-15 22:12:08,983] INFO Instantiated connector source-snowflake with version 0.5.1 of type class com.snowflake.kafka.connector.SnowflakeSinkConnector (org.apache.kafka.connect.runtime.Worker)
[2019-10-15 22:12:08,986] INFO 
[SF_KAFKA_CONNECTOR] Snowflake Kafka Connector Version: 0.5.1 (com.snowflake.kafka.connector.Utils)
[2019-10-15 22:12:09,029] INFO 
[SF_KAFKA_CONNECTOR] SnowflakeSinkConnector:start (com.snowflake.kafka.connector.SnowflakeSinkConnector)
[2019-10-15 22:12:09,030] ERROR 
[SF_KAFKA_CONNECTOR] name is empty or invalid. It should match Snowflake object identifier syntax. Please see the documentation. (com.snowflake.kafka.connector.Utils)
[2019-10-15 22:12:09,033] ERROR WorkerConnector{id=source-snowflake} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector)
com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException: 
[SF_KAFKA_CONNECTOR] Exception: Invalid input connector configuration
[SF_KAFKA_CONNECTOR] Error Code: 0001
[SF_KAFKA_CONNECTOR] Detail: input kafka connector configuration is null, missing required values, or wrong input value
at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:347)
at com.snowflake.kafka.connector.internal.SnowflakeErrors.getException(SnowflakeErrors.java:306)
at com.snowflake.kafka.connector.Utils.validateConfig(Utils.java:400)
at com.snowflake.kafka.connector.SnowflakeSinkConnector.start(SnowflakeSinkConnector.java:131)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)
at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)
at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:252)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1079)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:117)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1095)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1091)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

这是我的清理雪花配置文件:

{
  "name":"snowsink",
  "config":{
    "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
    "tasks.max":"8",
    "topics":"tp-snow-test",
    "buffer.count.records":"100",
    "buffer.flush.time":"60",
    "buffer.size.bytes":"65536",
    "snowflake.url.name":"xxxxxxx.east-us-2.azure.snowflakecomputing.com",
    "snowflake.user.name":"svc_cc_strm",
    "snowflake.private.key":"<key>",
    "snowflake.private.key.passphrase":<password>,
    "snowflake.database.name":"testdb",
    "snowflake.schema.name":"test1",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"com.snowflake.kafka.connector.records.SnowflakeJsonConverter"
   }
}

有什么想法吗?谢谢。

lhcgjxsq

lhcgjxsq1#

您需要更改连接器的名称( source-snow )移除 - 从它(以便它匹配此验证模式)。
?‍♂️

0qx6xfy6

0qx6xfy62#

连接器的名称应该是snowflake的有效sql标识符。很多Kafka主题的例子中都有破折号,当我第一次尝试雪花Kafka连接器时,我也遇到了同样的错误。
根据文档,雪花管道是使用指定的连接器名称创建的,管道名称必须是有效的sql标识符。
连接器为每个主题分区创建一个管道。名称为:
雪花\uKafka\u连接器\u管道\u。
同样来自同一文档页的“配置文件中的字段”作为名称:
应用程序名称。这在客户使用的所有kafka连接器中必须是唯一的。此名称必须是有效的雪花无引号标识符。
如果主题中有破折号,则需要将其Map到一个表名,该表名也是连接器配置中正确的sql标识符,否则它将尝试创建与主题名相同的表名,并在名称中的“-”处失败。

yb3bgrhw

yb3bgrhw3#

你需要在你的配置文件中有下面的条目,下面的主题条目。
“topics”:“tp雪地测试”,
“snowflake.topic2table.map”:“tp雪-test:testkafkatable",

相关问题