我使用reactor库从网络中获取大量数据流,并使用reactive kafka方法将其发送给kafka代理。
下面是我正在使用的Kafka制作人
public class LogProducer {
private final KafkaSender<String, String> sender;
public LogProducer(String bootstrapServers) {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "log-producer");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
SenderOptions<String, String> senderOptions = SenderOptions.create(props);
sender = KafkaSender.create(senderOptions);
}
public void sendMessages(String topic, Flux<Logs.Data> records) {
AtomicInteger sentCount = new AtomicInteger(0);
AtomicInteger fCount = new AtomicInteger(0);
records.doOnNext(r -> fCount.incrementAndGet()).subscribe();
System.out.println("Total Records: " + fCount);
sender.send(records.doOnNext(r -> sentCount.incrementAndGet())
.map(record -> {
LogRecord lrec = record.getRecords().get(0);
String id = lrec.getId();
return SenderRecord.create(new ProducerRecord<>(topic, id,
lrec.toString()), id);
})).then()
.doOnError(e -> {
log.error("[FAIL]: Send to the topic: '{}' failed. "
+ e, topic);
})
.doOnSuccess(s -> {
log.info("[SUCCESS]: {} records sent to the topic: '{}'", sentCount, topic);
})
.subscribe();
}
}
flux中的记录总数(fcount)和发送到kafka主题的记录总数(sentcount)不匹配,它不会给出任何错误并成功完成。
例如:在其中一种情况下,通量中的记录总数是2758,而发送到Kafka的计数是256。有没有Kafka的配置,需要修改,还是我遗漏了什么?
根据评论更新
sender.send(records
.map(record -> {
LogRecord lrec = record.getRecords().get(0);
String id = lrec.getId();
sleep(5); // sleep for 5 ns
return SenderRecord.create(new ProducerRecord<>(topic, id,
lrec.toString()), id);
})).then()
.doOnError(e -> {
log.error("[FAIL]: Send to the topic: '{}' failed. "
+ e, topic);
})
.doOnSuccess(s -> {
log.info("[SUCCESS]: {} records sent to the topic: '{}'", sentCount, topic);
})
.subscribe();
sleep(10); // sleep for 10 ns
上面的代码在一个系统中运行良好,但在另一个系统中无法发送所有消息。
暂无答案!
目前还没有任何答案,快来回答吧!