我们使用的是confluent的kafka s3连接器,版本5.2.1。在分布式辅助进程设置中使用一个节点运行。
根据文档,我们应该能够在大小和基于时间的间隔上将刷新设置为s3。
我们使用以下刷新设置
{
"rotate.interval.ms": 300000, // 5 minutes
"flush.size": 1000,
"timestamp.extractor": "Wallclock" // default
... (other settings)
}
但是我没有看到任何数据被写在一个只有不到1000条消息但有可用数据的主题中。
但是,当我将设置更改为较小的刷新大小并删除旋转间隔时:
{
"flush.size": 5, // some small amount
"timestamp.extractor": "Wallclock", // default
... (other settings)
}
在所有其他设置相同的情况下,我可以立即看到s3 bucket中的数据。
我没有改变任何其他设置,所以我很有信心,有连接到s3,我看到任务工人正在部署。
我错过什么了吗?
1条答案
按热度按时间wsewodh21#
如果你想每5分钟,你应该选择
rotate.schedule.interval.ms
,强制在此间隔内连接到转储文件。此配置确保每个配置的间隔都调用文件提交。。。提交将在计划的时间执行,而不考虑以前的提交时间或消息数
rotate.interval.ms
将对照批处理中第一个使用的记录检查记录时间戳。时间间隔是通过使用时间戳提取器来确定的
如果您的记录少于刷新大小,那么整个批处理只会在内存中等待,直到新记录的时间戳差大于第一次看到的记录。
flush.size
总是优先于所有其他时间设置,在写文件时,我最后一次看源代码。