我们构建了一个定制的kafka连接接收器,它反过来调用远程restapi。如何将背压传播到kafka connect基础结构,以便在远程系统比内部使用者向put()传递消息慢的情况下调用put()的频率更低?kafka connect文档说我们不应该阻塞in put(),而应该阻塞in flush()。但是在put()中不阻塞意味着我们必须缓冲数据,如果调用put()的频率高于flush(),那么在某些时候肯定会导致oom异常。我已经看到kafka消费者可以调用pause()或block in the loop()。有没有可能在Kafka连接接收器中利用这一点?
1条答案
按热度按时间dauxcl2d1#
我已经看到kafka消费者可以调用pause()或block in the loop()。有没有可能在Kafka连接接收器中利用这一点?
原始消费者没有暴露,所以没有。你可以打电话
/pause
在整个连接器上,尽管我不确定此时未刷新的消息会发生什么。但是在put()中不阻塞意味着我们必须缓冲数据,这肯定会在某个时候导致oom异常
当然可以,但这确实是将数据保留时间过长的唯一可行选择。例如,s3和hdfs连接器就是这样工作的。
rotate.interval.ms
调用文件提交的时间间隔(毫秒)。。。您的http客户端连接可能会阻塞请求,不是吗?
另一种方法是让您的http服务器嵌入一个kafka使用者,这样它就可以轮询消息本身并在本地对其执行操作,而不需要通过http发送请求。