我试图让Kafka连接多个消息到s3桶接收器。到目前为止,当我生成多行{sessionid:1,userid:1,timestamp:2011-10-10}{sessionid:2,userid:2,timestamp:2011-10-10}
当我下载json文件时,awss3 bucket只显示第一行。它说“无法扩展文件”,当我点击查看文件时,它只显示生成的第一行消息。
这是我的文件的一部分 python3-pip RUN pip3 install awscli
deployment.yaml文件
command:
-bash
- -c
- |
aws configure set aws_access_key_id "$aws_access_key" &
aws configure set aws_secret_key "$aws_secret_key" &
aws configure set default.region us-west-2 &
/etc/confluent/docker/run &
bin/connect-distributed.sh config/worker.properties
echo "Waiting for Kafka Connect to start listening on kafka-connect ⏳"
while [ $$(curl -s -o /dev/null -w %{http_code} http://kafka-connect:8083/connectors) -eq 000 ] ; do
echo -e $$(date) " Kafka Connect listener HTTP state: " $$(curl -s -o /dev/null -w %{http_code} http://kafka-connect:8083/connectors) " (waiting for 200)"
sleep 5
done
nc -vz kafka-connect 8083
echo -e "\n--\n+> Creating Kafka Connector(s)"
/scripts/create-connectors.sh # Note: This script is stored externally from container
bash properties.file
sleep infinity
属性文件:
{
“name”:“meetups-to-s3”,
“配置”:{
"_comment": "The S3 sink connector class",
"connector.class":"io.confluent.connect.s3.S3SinkConnector",
"_comment": "The total number of Connect tasks to spawn (with implicit upper limit the number of topic-partitions)",
"tasks.max":"1",
"_comment": "Which topics to export to S3",
"topics":"meetups",
"_comment": "The S3 bucket that will be used by this connector instance",
"s3.bucket.name":"meetups",
"_comment": "The AWS region where the S3 bucket is located",
"s3.region":"us-west-2",
"_comment": "The size in bytes of a single part in a multipart upload. The last part is of s3.part.size bytes or less. This property does not affect the total size of an S3 object uploaded by the S3 connector",
"s3.part.size":"5242880",
"_comment": "The maximum number of Kafka records contained in a single S3 object. Here a high value to allow for time-based partition to take precedence",
"flush.size":"100000",
"_comment": "Kafka Connect converter used to deserialize keys (unused in this example)",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"false",
"_comment": "Kafka Connect converter used to deserialize values",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",
"_comment": "The type of storage for this storage cloud connector",
"storage.class":"io.confluent.connect.s3.storage.S3Storage",
"_comment": "The storage format of the objects uploaded to S3",
"format.class":"io.confluent.connect.s3.format.json.JsonFormat",
"_comment": "Schema compatibility mode between records with schemas (Useful when used with schema-based converters. Unused in this example, listed for completeness)",
"schema.compatibility":"NONE",
"_comment": "The class used to partition records in objects to S3. Here, partitioning based on time is used.",
"partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"_comment": "The locale used by the time-based partitioner to encode the date string",
"locale":"en",
"_comment": "Setting the timezone of the timestamps is also required by the time-based partitioner",
"timezone":"UTC",
"_comment": "The date-based part of the S3 object key",
"path.format":"'date'=YYYY-MM-dd/'hour'=HH",
"_comment": "The duration that aligns with the path format defined above",
"partition.duration.ms":"3600000",
"_comment": "The interval between timestamps that is sufficient to upload a new object to S3. Here a small interval of 1min for better visualization during the demo",
"rotate.interval.ms":"60000",
"_comment": "The class to use to derive the timestamp for each record. Here Kafka record timestamps are used",
"timestamp.extractor":"Record"
}
}
所以我的question:is there kafka connect将多条消息生成到一个文件中的一种方法,因为一个主题可以有多条消息,就像multipart一样?部署文件或属性文件中是否缺少任何内容?谢谢,有问题请评论
暂无答案!
目前还没有任何答案,快来回答吧!