在消息发送的过程中,涉及俩个线程,main线程和sender线程,在main线程中创建一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafkabroker
参数名称 | 描述 |
---|---|
bootstrap.servers | 生产者连接集群所需的 broker 地址清单。例如<br>hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker里查找到其他 broker信息 |
key.serializer和 value.serializer | 指定发送消息的 key 和 value 的序列化类型。一定要写全类名 |
buffer.memory | RecordAccumulator 缓冲区总大小,默认 32m。 |
batch.size | 缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加 |
linger.ms | 如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms之间 |
acks | 0:生产者发送过来的数据,不需要等数据落盘应答。<br>1:生产者发送过来的数据,Leader收到数据后应答。<br>-1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和all是等价的 |
max.in.flight.requests.per.connection | 允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字 |
retries | 当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是 int最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了 |
retry.backoff.ms | 两次重试之间的时间间隔,默认是 100ms |
enable.idempotence | 是否开启幂等性,默认 true,开启幂等性 |
compression.type | 生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。支持压缩类型:none、gzip、snappy、lz4 和 zstd |
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
public class MyProducer {
private final static String TOPIC_NAME = "my-topic-test-1";
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1.创建kafka生产者配置对象
Properties props = new Properties();
//2.给 kafka 配置对象添加配置信息:bootstrap.servers
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "1.14.252.45:19092,1.14.252.45:19093,1.14.252.45:19094");
//key,value 序列化(必须)
//把发送的key从字符串序列化为字节数组
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//把发送消息value从字符串序列化为字节数组
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 3. 创建 kafka 生产者对象
Producer<String, String> producer = new KafkaProducer<>(props);
Order order = new Order((long) 1, 100);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, order.getId().toString(), JSON.toJSONString(order));
//4. 调用 send 方法,发送消息
RecordMetadata metadata = producer.send(producerRecord).get();
producer.close();
//=====阻塞=======
System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Order {
private Long id;
private Integer num;
}
因为这里我们选择的是同步发送消息,在收到kafka的ack告知发送成功之前一直处于阻塞状态
这里我们进入docker部署的kafka02容器中开启消费者,然后重新在运行一遍消费者
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME,0, order.getId().toString(), JSON.toJSONString(order));
未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNum
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME,order.getId().toString(), JSON.toJSONString(order));
默认就是异步发送
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。
生产者发消息,发送完后不用等待broker给回复,直接执行下面的业务逻辑。可以提供callback,让broker异步的调用callback,告知生产者,消息发送的结果
public class MyProducer2 {
public static void main(String[] args) throws InterruptedException {
//1.创建kafka生产者配置对象
Properties props = new Properties();
//2.给 kafka 配置对象添加配置信息:bootstrap.servers
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "1.14.252.45:19092,1.14.252.45:19093,1.14.252.45:19094");
//key,value 序列化(必须)
//把发送的key从字符串序列化为字节数组
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//把发送消息value从字符串序列化为字节数组
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 3. 创建 kafka 生产者对象
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 5; i++) {
Order order = new Order((long) i, 100);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("first", order.getId().toString(), JSON.toJSONString(order));
//4. 调用 send 方法,发送消息
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("发送消息失败:" +
exception.getStackTrace());
}
if (metadata != null) {
System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());
}
}
});
}
producer.close();
}
}
在同步发消息的场景下:生产者发动broker上后,ack会有 3 种不同的选择
props.put(ProducerConfig.ACKS_CONFIG, "1");
数据完全可靠条件 = ACK 级别设置为-1 + 分区副本大于等于2 + ISR 里应答的最小 副本 数量大于等于2
public class CustomProducerAcks {
public static void main(String[] args) {
// 0 配置
Properties properties = new Properties();
// 连接集群 bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
// 指定对应的key和value的序列化类型 key.serializer
// properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// acks
properties.put(ProducerConfig.ACKS_CONFIG,"1");
// 重试次数
properties.put(ProducerConfig.RETRIES_CONFIG,3);
// 1 创建kafka生产者对象
// "" hello
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 2 发送数据
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));
}
// 3 关闭资源
kafkaProducer.close();
}
}
public class CustomProducerParameters {
public static void main(String[] args) {
// 配置
Properties properties = new Properties();
// 连接kafka集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
// 序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 缓冲区大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
// 批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
// linger.ms
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// 压缩
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
// 1 创建生产者
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 2 发送数据
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));
}
// 3 关闭资源
kafkaProducer.close();
}
}
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
精确一次( (Exactly Once) ) = 幂等性 + 至少一次( ( ack=-1 + 分区副本数>=2 + ISR 最小副本数量>=2) )
如何使用幂等性
开启参数 enable.idempotence 默认为 true,false关闭
事务相关API
// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws
ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;
单个 Producer,使用事务保证消息的仅一次发送
public class CustomProducerTranactions {
public static void main(String[] args) {
// 0 配置
Properties properties = new Properties();
// 连接集群 bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
// 指定对应的key和value的序列化类型 key.serializer
// properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 指定事务id
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tranactional_id_01");
// 1 创建kafka生产者对象
// "" hello
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
try {
// 2 发送数据
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "atguigu" + i));
}
int i = 1 / 0;
kafkaProducer.commitTransaction();
} catch (Exception e) {
kafkaProducer.abortTransaction();
} finally {
// 3 关闭资源
kafkaProducer.close();
}
}
}
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_43296313/article/details/125525493
内容来源于网络,如有侵权,请联系作者删除!