我正在尝试向Kafka主题发送300000条消息,每条消息的大小约为1500字节。
最初发送的299996条消息持续了不到一分钟,但Kafka在外面逗留了一段时间,没有发送其余的消息。很奇怪。
我的Kafka制作者配置:
private void initKafka() {
Properties configProperties = new Properties();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaLocation);
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
kafkaProducer = new KafkaProducer<String, String>(configProperties);
}
我的代码:
for (ResponseDocument responseDocument : documents.getDocuments()) {
try {
LinkedHashMap<String, Collection<? extends Object>> fields = convertToMap(responseDocument);
String jsonDoc = objectMapper.writeValueAsString(fields);
String docId = responseDocument.getFirstValueAsString(".id");
ProducerRecord<String, String> record = new ProducerRecord<String, String>(kafkaTopic, docId, jsonDoc);
kafkaProducer.send(record);
} catch (Exception e) {
LOGGER.error(ErrorCode.DATA_ACCESS_ERROR, "Failed to send document with .id {0}: {1}",responseDocument.getId(), e.getMessage());
}
}
我试着对配置、发送的消息数(改为250000条)和其他一些想法进行了一些调整。。
你知道吗?
谢谢,这是预付款。
暂无答案!
目前还没有任何答案,快来回答吧!