我已经设置了我的kafkas3配置,但是我试图在生成消息时将实时消息消费到s3存储桶中。到目前为止,当我进入吊舱进行测试时,只有我测试的消息被输入。但是,我正在尝试接收Kafka处理器端产生的所有消息
curl -X POST $host -H "Content-Type: application/json" --data '{
"name": "'"$connector"'",
"config": {
"tasks.max":"9",
"connector.class":"io.confluent.connect.s3.S3SinkConnector",
"aws.access.key.id":"access-key-id",
"aws.secret.access.key":"secret-access-key",
"s3.credentials.provider.class":"com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
"flush.size":"5",
"topics":"topic name",
"s3.region": "us-west-2",
"s3.bucket.name": "bucket name",
"s3.part.size": "5242880",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"locale":"en",
"timezone":"America/Los_Angeles",
"path.format":"YYYY-MM-dd-HH",
"partition.duration.ms":"360000",
"rotate.schedule.interval.ms":"300000",
"schema.compatibility": "NONE",
"errors.tolerance": "all",
"errors.log.enable":true,
"errors.log.include.messages":true,
"name": "'"$connector"'"
}
}'
我用这个命令运行部署文件
command:
- bash
- -c
- |
aws configure set aws_access_key_id access-key-id &
aws configure set aws_secret_access_key secret-access-key &
aws configure set default.region us-west-2 &
/etc/confluent/docker/run &
bin/connect-distributed.sh config/worker.properties &
echo "Waiting for Kafka Connect to start listening on localhost"
while [ $$(curl -s -o /dev/null -w %{http_code} http://localhost:8084/connectors) -eq 000 ] ; do
echo -e $$(date) " Kafka Connect listener HTTP state: " $$(curl -s -o /dev/null -w %{http_code} http://localhost:8084/connectors) " (waiting for 200)"
sleep 5
done
nc -vz localhost 8084
bash create-connector.sh
echo -e "\n--\n+> Created S3 Bucket container"
sleep infinity
Is there more config i have to enter
1条答案
按热度按时间yruzcnhs1#
在
config/worker.properties
,您需要添加consumer.auto.offset.reset=earliest
接收所有现有邮件也可以尝试相同的属性,但前缀为
consumer.override
在连接器配置本身中