我正在尝试控制kstream使用的消息数,但不是很成功。我正在使用: max.poll.interval.ms=100 以及 max.poll.records=20 每秒收到200条信息。但这似乎不是很好,因为我看到在我的统计数据中,每秒大约有500条消息。我还应该在小溪边放些什么?
max.poll.interval.ms=100
max.poll.records=20
w7t8yxp51#
你可以在消费端使用类似akkastreamkafka(又名React式kafka)的东西。akka streams有很好的节流功能,在这里很有用:http://doc.akka.io/docs/akka/snapshot/java/stream/stream-quickstart.html#time-基于数据的处理
628mspwn2#
我使用max.poll.interval.ms=100和max.poll.records=20每秒获得大约200条消息。max.poll.interval.ms和max.poll.records属性不能以这种方式工作。max.poll.interval.ms指示使用者在主题的每次使用者轮询之间必须等待的最大时间间隔(毫秒)。max.poll.records指示使用者在主题的每个使用者轮询期间可以使用的最大记录数。每次轮询之间的间隔不受上述两个属性的控制,而是由您的消费者确认获取的记录所用的时间控制。例如,假设存在一个主题x,其中包含1000条记录,并且使用者确认获取的记录所用的时间是20毫秒。当max.poll.interval.ms=100和max.poll.records=20时,使用者将每20毫秒轮询一次Kafka主题,并且在每次轮询中,最多将获取20条记录。如果确认获取的记录所用的时间大于max.poll.interval.ms,则轮询将被视为失败,并且该特定批将从kafka主题重新轮询。
mwecs4sa3#
Kafka康苏美尔(也是一个内部使用的 KafkaStreams 尽可能快地读取记录。您提到的参数可能会对性能产生影响,但无法控制实际的数据速率。还要注意的是 max.poll.records 只配置有多少条记录 poll() 返回,但对客户机代理通信没有影响。一 KafkaConsumer 可以在与代理交谈时获取更多记录,然后在上返回缓冲消息 poll() 只要记录在缓冲区中(即,在这种情况下, poll() 是一个客户端操作符,它只确保您不会通过 max.poll.interval.ms ). 因此,您可能对 fetch.max.bytes ,它确定从代理获取的字节的大小。如果减少此参数,则使用者的效率会降低,因此吞吐量会降低(但不建议这样做)。配置吞吐量的另一种方法是配额(https://kafka.apache.org/documentation/#design_quotas)它是一种代理端配置,允许您限制客户端可以读取和/或写入的数据量。在kafka流中(以及使用普通kafkaconsumer时)最好的方法是限制对的调用 poll() 手动。对于Kafka流,可以添加 Thread.sleep() 任何自定义项。如果您不想将其附加到现有的操作符中,您只需添加一个 foreach() 使用临时状态(即,一个类成员变量)跟踪吞吐量,并计算相应地调节吞吐量所需的睡眠时间。
KafkaStreams
max.poll.records
poll()
KafkaConsumer
max.poll.interval.ms
fetch.max.bytes
Thread.sleep()
foreach()
3条答案
按热度按时间w7t8yxp51#
你可以在消费端使用类似akkastreamkafka(又名React式kafka)的东西。akka streams有很好的节流功能,在这里很有用:
http://doc.akka.io/docs/akka/snapshot/java/stream/stream-quickstart.html#time-基于数据的处理
628mspwn2#
我使用max.poll.interval.ms=100和max.poll.records=20每秒获得大约200条消息。
max.poll.interval.ms和max.poll.records属性不能以这种方式工作。
max.poll.interval.ms指示使用者在主题的每次使用者轮询之间必须等待的最大时间间隔(毫秒)。
max.poll.records指示使用者在主题的每个使用者轮询期间可以使用的最大记录数。
每次轮询之间的间隔不受上述两个属性的控制,而是由您的消费者确认获取的记录所用的时间控制。
例如,假设存在一个主题x,其中包含1000条记录,并且使用者确认获取的记录所用的时间是20毫秒。当max.poll.interval.ms=100和max.poll.records=20时,使用者将每20毫秒轮询一次Kafka主题,并且在每次轮询中,最多将获取20条记录。如果确认获取的记录所用的时间大于max.poll.interval.ms,则轮询将被视为失败,并且该特定批将从kafka主题重新轮询。
mwecs4sa3#
Kafka康苏美尔(也是一个内部使用的
KafkaStreams
尽可能快地读取记录。您提到的参数可能会对性能产生影响,但无法控制实际的数据速率。还要注意的是
max.poll.records
只配置有多少条记录poll()
返回,但对客户机代理通信没有影响。一KafkaConsumer
可以在与代理交谈时获取更多记录,然后在上返回缓冲消息poll()
只要记录在缓冲区中(即,在这种情况下,poll()
是一个客户端操作符,它只确保您不会通过max.poll.interval.ms
). 因此,您可能对fetch.max.bytes
,它确定从代理获取的字节的大小。如果减少此参数,则使用者的效率会降低,因此吞吐量会降低(但不建议这样做)。配置吞吐量的另一种方法是配额(https://kafka.apache.org/documentation/#design_quotas)它是一种代理端配置,允许您限制客户端可以读取和/或写入的数据量。
在kafka流中(以及使用普通kafkaconsumer时)最好的方法是限制对的调用
poll()
手动。对于Kafka流,可以添加Thread.sleep()
任何自定义项。如果您不想将其附加到现有的操作符中,您只需添加一个foreach()
使用临时状态(即,一个类成员变量)跟踪吞吐量,并计算相应地调节吞吐量所需的睡眠时间。