尝试按照互联网上的指令实现Kafka异步生产。我的制作人是这样的:
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public void asynSend(String topic, Integer partition, String message) {
ProducerRecord<Object, Object> data = new ProducerRecord<>(topic, partition,null, message);
producer.send(data, new DefaultProducerCallback());
}
private static class DefaultProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
logger.error("Asynchronous produce failed");
}
}
}
我在for循环中产生这样的结果:
for (int i = 0; i < 5000; i++) {
int partition = i % 2;
FsProducerFactory.getInstance().asynSend(topic, partition,i + "th message to partition " + partition);
}
但是,有些信息可能会丢失。如下图所示,从4508到4999的消息未送达。
我发现原因可能是生产者进程的关闭和所有在缓存中没有发送的消息将丢失。在for循环之后添加这一行可以解决此问题:
producer.flush();
然而,我不确定这是否是一个有魅力的解决方案,因为我注意到有人提到flush将使异步发送以某种方式同步,有人能解释或帮助我改进它吗?
1条答案
按热度按时间eiee3dmh1#
在书中
Kafka - The definitive Guide
这里有一个aszncronousproducer的例子,它与您编写的代码完全相同。它使用send
和一个Callback
.讨论中写道:
添加
flush()
在退出之前,将使客户机等待任何未完成的消息被传递到代理(这将发生)queue.buffering.max.ms
,加上延迟)。如果你加上flush()
每次之后produce()
呼叫您正在有效地实现同步生产者。但如果你在比赛结束后
for
它不再是同步的,而是异步的。你能做的就是设置
acks
在生产者配置中all
. 这样,当主题的复制设置为大于1时,您将有更多的保证来成功生成消息。