如何减慢或设置Kafka流消费者给定的速度?

epggiuax  于 2021-06-08  发布在  Kafka
关注(0)|答案(3)|浏览(567)

我正在尝试控制kstream使用的消息数,但不是很成功。
我正在使用: max.poll.interval.ms=100 以及 max.poll.records=20 每秒收到200条信息。
但这似乎不是很好,因为我看到在我的统计数据中,每秒大约有500条消息。
我还应该在小溪边放些什么?

w7t8yxp5

w7t8yxp51#

你可以在消费端使用类似akkastreamkafka(又名React式kafka)的东西。akka streams有很好的节流功能,在这里很有用:
http://doc.akka.io/docs/akka/snapshot/java/stream/stream-quickstart.html#time-基于数据的处理

628mspwn

628mspwn2#

我使用max.poll.interval.ms=100和max.poll.records=20每秒获得大约200条消息。
max.poll.interval.ms和max.poll.records属性不能以这种方式工作。
max.poll.interval.ms指示使用者在主题的每次使用者轮询之间必须等待的最大时间间隔(毫秒)。
max.poll.records指示使用者在主题的每个使用者轮询期间可以使用的最大记录数。
每次轮询之间的间隔不受上述两个属性的控制,而是由您的消费者确认获取的记录所用的时间控制。
例如,假设存在一个主题x,其中包含1000条记录,并且使用者确认获取的记录所用的时间是20毫秒。当max.poll.interval.ms=100和max.poll.records=20时,使用者将每20毫秒轮询一次Kafka主题,并且在每次轮询中,最多将获取20条记录。如果确认获取的记录所用的时间大于max.poll.interval.ms,则轮询将被视为失败,并且该特定批将从kafka主题重新轮询。

mwecs4sa

mwecs4sa3#

Kafka康苏美尔(也是一个内部使用的 KafkaStreams 尽可能快地读取记录。
您提到的参数可能会对性能产生影响,但无法控制实际的数据速率。还要注意的是 max.poll.records 只配置有多少条记录 poll() 返回,但对客户机代理通信没有影响。一 KafkaConsumer 可以在与代理交谈时获取更多记录,然后在上返回缓冲消息 poll() 只要记录在缓冲区中(即,在这种情况下, poll() 是一个客户端操作符,它只确保您不会通过 max.poll.interval.ms ). 因此,您可能对 fetch.max.bytes ,它确定从代理获取的字节的大小。如果减少此参数,则使用者的效率会降低,因此吞吐量会降低(但不建议这样做)。
配置吞吐量的另一种方法是配额(https://kafka.apache.org/documentation/#design_quotas)它是一种代理端配置,允许您限制客户端可以读取和/或写入的数据量。
在kafka流中(以及使用普通kafkaconsumer时)最好的方法是限制对的调用 poll() 手动。对于Kafka流,可以添加 Thread.sleep() 任何自定义项。如果您不想将其附加到现有的操作符中,您只需添加一个 foreach() 使用临时状态(即,一个类成员变量)跟踪吞吐量,并计算相应地调节吞吐量所需的睡眠时间。

相关问题