我在一个视频教程中看到,当制作者发布消息时,kafka代理支持3种类型的确认。
0-开火并忘记1-领队确认2-确认所有经纪人
我正在使用kafka的javaapi发布消息。这是必须使用特定于每个代理的server.properties为每个代理设置的内容,还是必须由生产者设置的内容?如果必须由生产商设置,请解释如何使用JavaAPI设置。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
public class KafkaProducerApp {
public static void main(String[] args){
Properties properties = new Properties();
properties.put("bootstrap.servers","localhost:9092,localhost:9093,localhost:9094");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
try{
for(int i=0;i<150;i++) {
RecordMetadata ack = kafkaProducer.send(new ProducerRecord<String, String>("replicated_topic", Integer.toString(i), "MyMessage" + Integer.toString(i))).get();
System.out.println(" Offset = " + ack.offset());
System.out.println(" Partition = " + ack.partition());
}
} catch (Exception ex){
ex.printStackTrace();
} finally {
kafkaProducer.close();
}
}
}
2条答案
按热度按时间bybem2ql1#
它是生产者属性,设置与代码中的其他属性类似:
所有可配置生产者属性的列表可以在这里找到。
您可能还需要查看broker(或topic)属性
min.insync.replicas
与此生产者配置相关的。iaqfqrcu2#
我认为您应该了解acks属性实际上做了什么,并查看幕后的情况。如果这是确定的,您将看到这个属性是由生产者配置的。
例如,不能丢失任何消息,如审核日志。下面的代码说明了如何启动producer配置:
这是一个小而有力的变化,它对消息是否会到达有着重大影响。
这张Kafka在《行动》一书中所代表的形象更为清晰
acks
属性: