我在用Kafka和Spring Boot:
Kafka制作人课程:
@Service
public class MyKafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private static Logger LOGGER = LoggerFactory.getLogger(NotificationDispatcherSender.class);
// Send Message
public void sendMessage(String topicName, String message) throws Exception {
LOGGER.debug("========topic Name===== " + topicName + "=========message=======" + message);
ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topicName, message);
result.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
LOGGER.debug("sent message='{}' with offset={}", message, result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
LOGGER.error(Constants.PRODUCER_MESSAGE_EXCEPTION.getValue() + " : " + ex.getMessage());
}
});
}
}
Kafka配置:
spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=100000
spring.kafka.producer.request.timeout.ms=30000
spring.kafka.producer.linger.ms=10
spring.kafka.producer.acks=0
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.max.block.ms=5000
spring.kafka.bootstrap-servers=192.168.1.161:9092,192.168.1.162:9093
假设我在topic中发送了10次1000条消息 my-test-topic
.
10次中有8次我成功地获取了消费者的所有消息,但有时我会遇到以下错误: 2017-10-05 07:24:11, [ERROR] [my-service - LoggingProducerListener - onError:76] Exception thrown when sending a message with key='null' and payload='{"deviceType":"X","deviceKeys":[{"apiKey":"X-X-o"}],"devices...' to topic my-test-topic
以及 org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for my-test-topic-4 due to 30024 ms has passed since batch creation plus linger time
1条答案
按热度按时间lf3rwulv1#
有3种可能性:
增加
request.timeout.ms
-这是Kafka在缓冲区等待整批准备就绪的时间。所以在您的例子中,如果缓冲区中的消息少于100000条,就会发生超时。更多信息请点击此处:https://stackoverflow.com/a/34794261/2707179减少
batch-size
-与前一点相关,它将更频繁地发送批处理,但它们将包含更少的消息。根据消息大小,可能您的网络无法赶上高负载?检查吞吐量是否不是瓶颈。