kafka connect sink任务中多久触发一次put()?

bcs8qyzn  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(300)

我能控制时间间隔吗 put() 我的kafka连接接收器任务的方法是否触发?在这方面,Kafka连接框架的预期行为是什么?理想情况下,我希望指定,例如,“除非您有x个新记录/y个新字节,或者自上次调用以来经过了z毫秒,否则不要调用我”。这可能会使sink任务中的批处理逻辑更简单(引用文档,“在许多情况下,内部缓冲非常有用,因此可以一次发送整批记录,从而减少将事件插入下游数据存储的开销)。

rseugnpd

rseugnpd1#

今天,从一个 SinkTask 仅当在 WorkerSinkTask . 好消息是 deliverMessages happens在poll中,因此您应该通过重写使用者属性来控制轮询新记录的频率。
如果您想做内部缓冲,您可以看看hdfsconnector在其任务实现中是如何处理的。但是,现在,connect将立即放置投票返回的任何记录。
所有这些都说明,如果您确实希望在消息到达下游系统之前对其进行批处理,那么可以考虑查看offset.flush.interval.ms和offset.flush.timeout.ms,它们控制消息的频率 flush() 已调用。

相关问题