如何处理Kafka连接Flume中的背压?

4xrmg8kj  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(437)

我们构建了一个定制的kafka连接接收器,它反过来调用远程restapi。如何将背压传播到kafka connect基础结构,以便在远程系统比内部使用者向put()传递消息慢的情况下调用put()的频率更低?kafka connect文档说我们不应该阻塞in put(),而应该阻塞in flush()。但是在put()中不阻塞意味着我们必须缓冲数据,如果调用put()的频率高于flush(),那么在某些时候肯定会导致oom异常。我已经看到kafka消费者可以调用pause()或block in the loop()。有没有可能在Kafka连接接收器中利用这一点?

dauxcl2d

dauxcl2d1#

我已经看到kafka消费者可以调用pause()或block in the loop()。有没有可能在Kafka连接接收器中利用这一点?
原始消费者没有暴露,所以没有。你可以打电话 /pause 在整个连接器上,尽管我不确定此时未刷新的消息会发生什么。
但是在put()中不阻塞意味着我们必须缓冲数据,这肯定会在某个时候导致oom异常
当然可以,但这确实是将数据保留时间过长的唯一可行选择。例如,s3和hdfs连接器就是这样工作的。 rotate.interval.ms 调用文件提交的时间间隔(毫秒)。。。
您的http客户端连接可能会阻塞请求,不是吗?
另一种方法是让您的http服务器嵌入一个kafka使用者,这样它就可以轮询消息本身并在本地对其执行操作,而不需要通过http发送请求。

相关问题