我们的团队正在将kafka与flask应用程序集成,以实时显示数据,但我们也希望显示来自kafka的历史数据。
这样,我们就可以使用来自特定主题的所有消息,并将数据显示给用户。然而,当我们将avro消费者设置为轮询整个主题的消息时,我们一分钟只能消耗10-20万条消息,这太慢了,因为每个主题大约有250万条消息。即使我们使用相同的组id设置upp多个消费者,我们仍然没有多少性能改进。
关于如何以更快的方式获得Kafka主题的所有信息,有什么建议吗?或者将数据保存到数据库,然后从那里查询数据会更好吗?
我们的消费者:
c = Consumer({
'bootstrap.servers': 'brokers:9092',
'group.id': 'consume_all_topics',
'auto.offset.reset': 'earliest'
})
c.subscribe(['mytopic'])
now = datetime.now()
msg = c.poll(5.0)
while msg.value()['timestamp'] < now:
msg = c.poll(5.0)
1条答案
按热度按时间osh3o9ms1#
“即使我们为upp设置了多个具有相同组id的消费者,我们的性能仍然没有多大改善。
关于如何以更快的方式获得Kafka主题的所有信息,有什么建议吗?”
kafka的使用量随主题中分区的数量而扩展。请记住,一个分区只能由一个使用者组中的一个使用者使用。如果分区的数量与使用者组中的使用者数量匹配,您将获得最佳使用者性能。
此外,如果对数据进行压缩(例如
zstd
,在版本2.2.x中提供)。请注意,压缩最好在生产者端处理。