尝试将实时kafka消息摄取到s3存储桶中

lvmkulzt  于 2021-07-15  发布在  Kafka
关注(0)|答案(1)|浏览(298)

我已经设置了我的kafkas3配置,但是我试图在生成消息时将实时消息消费到s3存储桶中。到目前为止,当我进入吊舱进行测试时,只有我测试的消息被输入。但是,我正在尝试接收Kafka处理器端产生的所有消息

curl -X POST $host -H "Content-Type: application/json" --data '{
  "name": "'"$connector"'",
  "config": {
  "tasks.max":"9",
  "connector.class":"io.confluent.connect.s3.S3SinkConnector",
  "aws.access.key.id":"access-key-id",
  "aws.secret.access.key":"secret-access-key",
  "s3.credentials.provider.class":"com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
  "flush.size":"5",
 "topics":"topic name",
  "s3.region": "us-west-2",
  "s3.bucket.name": "bucket name",
  "s3.part.size": "5242880",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
  "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
  "locale":"en",
  "timezone":"America/Los_Angeles",
  "path.format":"YYYY-MM-dd-HH",
  "partition.duration.ms":"360000",
  "rotate.schedule.interval.ms":"300000",
  "schema.compatibility": "NONE",
  "errors.tolerance": "all",
  "errors.log.enable":true,
  "errors.log.include.messages":true,
  "name": "'"$connector"'"
  }
  }'

我用这个命令运行部署文件

command:
  - bash
  - -c
  - |
  aws configure set aws_access_key_id access-key-id &
  aws configure set aws_secret_access_key secret-access-key &
  aws configure set default.region us-west-2 &
  /etc/confluent/docker/run &
  bin/connect-distributed.sh config/worker.properties &
  echo "Waiting for Kafka Connect to start listening on localhost"
  while [ $$(curl -s -o /dev/null -w %{http_code} http://localhost:8084/connectors) -eq 000 ] ; do
  echo -e $$(date) " Kafka Connect listener HTTP state: " $$(curl -s -o /dev/null -w %{http_code} http://localhost:8084/connectors) " (waiting for 200)"
  sleep 5
  done
  nc -vz localhost 8084
  bash create-connector.sh
  echo -e "\n--\n+> Created S3 Bucket container"
  sleep infinity

Is there more config i have to enter
yruzcnhs

yruzcnhs1#

config/worker.properties ,您需要添加 consumer.auto.offset.reset=earliest 接收所有现有邮件
也可以尝试相同的属性,但前缀为 consumer.override 在连接器配置本身中

相关问题