如何使用pykafka的消息批处理或缓冲区生成kafka主题。我的意思是一个生产者可以在一个生产过程中产生许多信息。我知道使用消息批处理或缓冲消息的概念,但我不知道如何实现它。我希望有人能帮我
qjp7pelc1#
pykafka在producer中透明地处理消息批处理—您不必执行任何特殊操作来确保消息是成批生成的。这个 Producer 类提供了一组配置选项,允许您自定义批处理行为。文档中提供了这些选项的完整列表,但其中一些最重要的选项是: max_queued_messages -当你已经 produce() 如果有更多的消息,请立即发送该批消息 min_queued_messages -当你已经 produce() 至少有这么多信息,发一批 linger_ms -当上一批货过了这么长时间后,请发送该批货
Producer
max_queued_messages
produce()
min_queued_messages
linger_ms
rkue9o1l2#
就用这个 send() 方法。你不需要自己管理它。send()是异步的。调用时,它将记录添加到挂起记录发送的缓冲区中,并立即返回。这允许制作者将单个记录批处理在一起以提高效率。你的任务只是在这方面配置两个道具:批量大小和延迟时间。生产者为每个分区维护未发送记录的缓冲区。这些缓冲区的大小由'batch\u size'配置指定。将其变大会导致更多的批处理,但需要更多的内存(因为每个活动分区通常都有一个缓冲区)。这两个道具将通过以下方式完成:一旦我们得到一个分区的批大小值的记录,不管这个设置如何,它都会立即被发送,但是如果我们为这个分区积累的字节数少于这么多,我们将“逗留”指定的时间,等待更多的记录出现。
send()
2条答案
按热度按时间qjp7pelc1#
pykafka在producer中透明地处理消息批处理—您不必执行任何特殊操作来确保消息是成批生成的。这个
Producer
类提供了一组配置选项,允许您自定义批处理行为。文档中提供了这些选项的完整列表,但其中一些最重要的选项是:max_queued_messages
-当你已经produce()
如果有更多的消息,请立即发送该批消息min_queued_messages
-当你已经produce()
至少有这么多信息,发一批linger_ms
-当上一批货过了这么长时间后,请发送该批货rkue9o1l2#
就用这个
send()
方法。你不需要自己管理它。send()是异步的。调用时,它将记录添加到挂起记录发送的缓冲区中,并立即返回。这允许制作者将单个记录批处理在一起以提高效率。
你的任务只是在这方面配置两个道具:批量大小和延迟时间。
生产者为每个分区维护未发送记录的缓冲区。这些缓冲区的大小由'batch\u size'配置指定。将其变大会导致更多的批处理,但需要更多的内存(因为每个活动分区通常都有一个缓冲区)。
这两个道具将通过以下方式完成:
一旦我们得到一个分区的批大小值的记录,不管这个设置如何,它都会立即被发送,但是如果我们为这个分区积累的字节数少于这么多,我们将“逗留”指定的时间,等待更多的记录出现。