我能控制时间间隔吗 put()
我的kafka连接接收器任务的方法是否触发?在这方面,Kafka连接框架的预期行为是什么?理想情况下,我希望指定,例如,“除非您有x个新记录/y个新字节,或者自上次调用以来经过了z毫秒,否则不要调用我”。这可能会使sink任务中的批处理逻辑更简单(引用文档,“在许多情况下,内部缓冲非常有用,因此可以一次发送整批记录,从而减少将事件插入下游数据存储的开销)。
我能控制时间间隔吗 put()
我的kafka连接接收器任务的方法是否触发?在这方面,Kafka连接框架的预期行为是什么?理想情况下,我希望指定,例如,“除非您有x个新记录/y个新字节,或者自上次调用以来经过了z毫秒,否则不要调用我”。这可能会使sink任务中的批处理逻辑更简单(引用文档,“在许多情况下,内部缓冲非常有用,因此可以一次发送整批记录,从而减少将事件插入下游数据存储的开销)。
1条答案
按热度按时间rseugnpd1#
今天,从一个
SinkTask
仅当在WorkerSinkTask
. 好消息是deliverMessages
happens在poll中,因此您应该通过重写使用者属性来控制轮询新记录的频率。如果您想做内部缓冲,您可以看看hdfsconnector在其任务实现中是如何处理的。但是,现在,connect将立即放置投票返回的任何记录。
所有这些都说明,如果您确实希望在消息到达下游系统之前对其进行批处理,那么可以考虑查看offset.flush.interval.ms和offset.flush.timeout.ms,它们控制消息的频率
flush()
已调用。