从S3到Kafka使用Apache Camel源代码

lndjwyie  于 2022-11-07  发布在  Apache
关注(0)|答案(1)|浏览(166)

我想从amazon-s3读取数据到Kafka。我找到了camel-aws-s3-kafka-connector源代码,我尝试使用它,它工作,但...我想从s3读取数据,而不删除文件,但对每个消费者执行一次,没有重复。是否可以只使用配置文件来完成此操作?我已经创建了如下文件:

name=CamelSourceConnector
connector.class=org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.camel.kafkaconnector.awss3.converters.S3ObjectConverter

camel.source.maxPollDuration=10000

topics=ReadTopic

# prefix=WriteTopic

camel.source.endpoint.prefix=full/path/to/WriteTopic2

camel.source.path.bucketNameOrArn=BucketName
camel.source.endpoint.autocloseBody=false

camel.source.endpoint.deleteAfterRead=false

camel.sink.endpoint.region=xxxx
camel.component.aws-s3.accessKey=xxxx
camel.component.aws-s3.secretKey=xxxx

此外,使用上述配置,我无法仅从“WriteTopic”读取,但可以从s3中的所有文件夹读取,是否还可以配置?S3Bucket folders with files

ddhy6vgd

ddhy6vgd1#

我找到了解决重复问题的方法,我不完全确定这是最好的方法,但它可能会帮助一些人。我的方法描述如下:https://camel.apache.org/blog/2020/12/CKC-idempotency-070/。我使用了camel.idempotency.repository。type=memory,我的配置文件如下所示:

name=CamelAWS2S3SourceConnector connector.class=org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
camel.source.maxPollDuration=10000
topics=ReadTopic

# scieżka z ktorej czytamy dane

camel.source.endpoint.prefix=full/path/to/topic/prefix
camel.source.path.bucketNameOrArn="Bucket name"
camel.source.endpoint.deleteAfterRead=false
camel.component.aws2-s3.access-key=****
camel.component.aws2-s3.secret-key=****
camel.component.aws2-s3.region=****

# remove duplicates from messages#

camel.idempotency.enabled=true 
camel.idempotency.repository.type=memory 
camel.idempotency.expression.type=body

更改camel连接器库也很重要。最初我使用camel-aws-s3-kafka-connector source,要使用幂等使用者,我需要更改camel-aws 2-s3-kafka-connector source上的连接器

相关问题