我想使用kafka connect s3 sink连接器将主题的数据流传输到s3 bucket。主题中的数据将是xml消息。根据连接器配置,我们可以定义消息的格式(例如:jsonformat)
根据合流文档,我们可以通过实现 io.confluent.connect.storage.format.Format.
我浏览了可用的格式代码,比如jsonformat,看起来格式的实际逻辑在jsonrecordwriterprovider中,jsonrecordwriterprovider是 io.confluent.connect.storage.format.RecordWriterProvider
我看到recordwriter编写实现将jsonconvertor convert方法应用于sinkrecord.value()。
我们如何知道sinkrecord包含什么?我们可以编写一个xmlconvertor并将sinkrecord.value()转换成dom对象吗?
有什么我可以用来实现的参考资料吗?
我正在检查confluent提供的代码。
https://github.com/confluentinc/kafka-connect-storage-cloud/tree/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format
暂无答案!
目前还没有任何答案,快来回答吧!