发送大量消息

lkaoscv7  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(192)

我正在尝试向Kafka主题发送300000条消息,每条消息的大小约为1500字节。
最初发送的299996条消息持续了不到一分钟,但Kafka在外面逗留了一段时间,没有发送其余的消息。很奇怪。
我的Kafka制作者配置:

private void initKafka() {
    Properties configProperties = new Properties();
    configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaLocation);
    configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    kafkaProducer = new KafkaProducer<String, String>(configProperties);
}

我的代码:

for (ResponseDocument responseDocument : documents.getDocuments()) {
                try {
                    LinkedHashMap<String, Collection<? extends Object>> fields = convertToMap(responseDocument);
                    String jsonDoc = objectMapper.writeValueAsString(fields);
                    String docId =  responseDocument.getFirstValueAsString(".id");
                    ProducerRecord<String, String> record = new ProducerRecord<String, String>(kafkaTopic, docId, jsonDoc);
                    kafkaProducer.send(record);
                } catch (Exception e) {
                    LOGGER.error(ErrorCode.DATA_ACCESS_ERROR, "Failed to send document with .id {0}: {1}",responseDocument.getId(), e.getMessage());
                }
            }

我试着对配置、发送的消息数(改为250000条)和其他一些想法进行了一些调整。。
你知道吗?
谢谢,这是预付款。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题