我正在使用S3 Sink连接器从Kafka向S3写入记录。最终我将使用Kafka从我的数据库捕获CDC数据包,然后将这些数据包写入S3。
但是,我不希望每一个CDC数据包(在Kafka看来是一个记录)都被写入一个单独的S3对象。我希望配置一个基于大小或时间的条件,以便每X秒或Y字节的所有记录都被写入一个S3对象。
我还没有找到任何可以将记录写入一个对象的东西,但是我找到了Kafka Consumer属性fetch.min.bytes
和fetch.max.wait.ms
,它们每X秒或Y字节写入一次对象--但是多个记录仍然作为单独的对象写入。
1条答案
按热度按时间nhhxz33t1#
您不应该为此使用基本的消费者(我的意思是,您可以这样做,但是您显然需要自己编写所有的“批处理”逻辑)。
S3Kafka Connect接收器已经通过
flush.size
(记录计数,而不是字节)和/或基于时间的分区程序完成了此操作。Secor项目也值得关注。