新版本的Kafka制作者还有“producer.type”吗?

zsohkypk  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(406)

旧版本的文档说这是一个基本属性。
新版本的文档根本没有提到。
Kafka制作人的新版本还有吗 producer.type ?
或者,新的制片人总是 async ,我应该打电话 future.get() 为了成功 sync ?

ggazkfy8

ggazkfy81#

新的生产者总是异步的,您应该调用future.get()使其同步。当像添加future.get()这样简单的东西为您提供基本相同的功能时,创建两个API方法是不值得的。
从send()的文档中
https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/producer/kafkaproducer.html
由于send调用是异步的,因此它返回将分配给该记录的recordmetadata的未来。在此将来调用get()将被阻止,直到相关请求完成,然后返回记录的元数据或引发发送记录时发生的任何异常。
如果要模拟简单的阻塞调用,可以立即调用get()方法:

byte[] key = "key".getBytes();
byte[] value = "value".getBytes();  
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value);
producer.send(record).get();
s5a0g9ez

s5a0g9ez2#

你为什么要做这个 send() 要同步吗?
这是一个kafka特性,用于批处理消息以获得更好的吞吐量。
异步发送
批处理是提高效率的主要驱动因素之一,为了实现批处理,kafka生产者将尝试在内存中积累数据,并在单个请求中发送更大的批。批处理可以配置为累积不超过固定数量的消息,并且等待时间不超过某个固定的延迟限制(例如64k或10ms)。这允许累积更多的字节来发送,并且在服务器上很少有更大的i/o操作。这种缓冲是可配置的,并提供了一种机制来权衡少量的额外延迟以获得更好的吞吐量。
因为api只支持异步方法,所以无法执行发送同步,但是可以指定一些配置来执行周围的工作。
可以将batch.size设置为0。在这种情况下,消息bacthing被禁用。
不过,我认为您应该保留batch.size默认值,并将linger.ms设置为0(这也是默认值)。在这种情况下,如果多条消息同时出现,它们将立即在一次发送中被批处理。
生产者将在请求传输之间到达的所有记录组合到一个单独的批处理请求中。通常情况下,只有当记录到达的速度比发送的速度快时,才会发生这种情况。
如果您想确保消息成功发送并持久化,您可以将acks设置为-1或1,retries设置为3(例如)
关于producer配置的更多信息,您可以参考https://kafka.apache.org/documentation/#producerconfigs

相关问题