我正在为kafka和google云存储(gcs)使用confluent connector。我安装confluent时使用:
(我在ubuntu16.04上做这个,所有这些命令都是在home目录下执行的。)
curl https://packages.confluent.io/archive/5.0/confluent-5.0.0-2.11.tar.gz | tar xz
然后,我通过以下方式安装了confluent hub:
curl http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz | tar xz
然后,我使用以下工具安装了kafka gcs连接器:
confluent-hub install confluentinc/kafka-connect-gcs:5.0.1
然后使用以下方法导出路径:
export PATH=<insert-path-here>/confluent-5.0.0/bin:$PATH
然后我使用以下方法开始合流:
confluent start
它显示:
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html
Using CONFLUENT_CURRENT: /tmp/confluent.pc9RaNNQ
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]
通过以下方式检查合流:
confluent status
给了我:
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html
control-center is [UP]
ksql-server is [UP]
connect is [UP]
kafka-rest is [UP]
schema-registry is [UP]
kafka is [UP]
zookeeper is [UP]
到目前为止一切都很好。Zookeeper和Kafka都在运行。现在,当我使用以下命令启动kafka gcs汇合连接器时:
confluent load gcs -d quickstart-gcs.properties
它给出以下错误:
Failed to find any class that implements Connector and which name
matches io.confluent.connect.gcs.GcsSinkConnector
quickstart-gcs.properties文件位于主目录中,如下所示:
name=gcs-sink
connector.class=io.confluent.connect.gcs.GcsSinkConnector
tasks.max=1
topics=test
gcs.bucket.name=test_bucket
gcs.part.size=5242880
flush.size=3
gcs.credentials.path=/path/to/kafka/key.json
storage.class=io.confluent.connect.gcs.storage.GcsStorage
format.class=io.confluent.connect.gcs.format.avro.AvroFormat
partitioner.class=io.confluent.connect.storage.partitioner
.DefaultPartitioner
schema.compatibility=NONE
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
confluent.license=
线路:
partitioner.class=io.confluent.connect.storage.partitioner
.DefaultPartitioner
实际上是这样的:
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
因为编辑的限制,我不得不这样写。很抱歉。
根据文档,quickstart-gcs.properties的最后一行实际上是:
confluent.license=""
但这对我不起作用。
非常感谢您的帮助。
编辑1:根据@cricket\u 007评论:
curl http://localhost:8083/connector-plugins
给予:
[{“class”:“io.confluent.connect.elasticsearch.elasticsearchsinkconnector”,“type”:“sink”,“version”:“5.0.0”},{“class”:“io.confluent.connect.hdfs.hdfssinkconnector”,“type”:“sink”,“version”:“5.0.0”},{“class”:“io.confluent.connect.hdfs.tools.schemasourceconnector”,“type”:“source”,“version”:“2.0.0-cp1”},{“class”:“io.confluent.connect.jdbc.jdbcsinkconnector”,“type”:“sink”,“version”:“5.0.0”},{“class”:“io.confluent.connect.jdbc.jdbcsourceconnector”,“type”:“source”,“version”:“5.0.0”},{“class”:“io.confluent.connect.s3.s3sinkconnector”,“type”:“sink”,“version”:“5.0.0”},{“class”:“io.confluent.connect.storage.tools.schemasourceconnector”,“type”:“source”,“version”:“2.0.0-cp1”},{“class”:“org.apache.kafka.connect.file.filestreamsinkconnector”,“type”:“sink”,“version”:“2.0.0-cp1”},{“class”:“org.apache.kafka.connect.file.filestreamsourceconnector”,“type”:“source”,“version”:“2.0.0-cp1”}]
暂无答案!
目前还没有任何答案,快来回答吧!