我已经使用独立的方法手动安装了confluent kafka connect s3,而不是通过confluent的进程或作为整个平台的一部分。
我可以使用以下命令从命令行成功启动连接器:
./kafka_2.11-2.1.0/bin/connect-standalone.sh connect.properties s3-sink.properties
主题cdc从aws msk的偏移量可以看到正在被消耗。不会抛出错误。但是,在awss3中,没有为新数据创建文件夹结构,也没有存储json数据。
问题
连接器是否应该在看到主题的第一个json包时动态创建文件夹结构?
除了配置awscli凭据、connect.properties和s3-sink.properties之外,是否需要设置其他设置才能正确连接到s3存储桶?
关于安装文档的建议比合流网站上的独立文档更全面((上面链接)
connect.properties属性
bootstrap.servers服务器=redacted:9092,redacted:9092,redacted:9092
plugin.path=/plugins/kafka-connect-s3 key.converter=org.apache.kafka.connect.json.jsonconverter value.converter=org.apache.kafka.connect.json.jsonconverter key.converter.schemas.enable=false value.converter.schemas.enable=false internal.key.converter=org.apache.kafka.connect.json.jsonconverterinternal.value.converter=org.apache.kafka.connect.json.jsonconverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.file.filename=/tmp/connect.offset
s3-Flume属性
name=s3 sink connector.class=io.confluent.connect.s3.s3sinkconnector tasks.max=1 topics=database\u schema\u topic1,database\u schema\u topic2,database\u schema\u topic3 s3.region=us-east-2 s3.bucket.name=databasekafka s3.part.size=5242880 flush.size=1 storage.class=io.confluent.connect.s3.storage.s3storage format.class=io.confluent.connect.s3.format.json.jsonformat schema.generator.class=io.confluent.connect.storage.hive.schema.defaultschemageneratorpartitioner.class=io.confluent.connect.storage.partitioner.defaultpartitioner schema.compatibility=none
1条答案
按热度按时间2nc8po8w1#
连接器是否应该在看到主题的第一个json包时动态创建文件夹结构?是的,甚至可以使用参数“topics.dir”和“path.format”来控制此路径(目录结构)
除了配置awscli凭据、connect.properties和s3-sink.properties之外,是否需要设置其他设置才能正确连接到s3存储桶?默认情况下,s3连接器将通过环境变量或凭据文件使用aws凭据(访问id和密钥)。您可以通过修改参数“s3.credentials.provider.class”进行更改。该参数的默认值为“defaultawscredentialsproviderchain”
关于安装文档的建议比合流网站上的独立文档更全面(我建议您使用分布式模式,因为它为您的connect集群和在其上运行的连接器提供了高可用性。您可以阅读下面的文档来配置分布式模式下的连接集群。https://docs.confluent.io/current/connect/userguide.html#connect-userguide dist worker配置