我使用apachecamel2.20.2(以及springboot1.5.8)将通过mqtt传入的消息路由到另外两个服务,一个是http/soap(worksok),另一个是apachekafka。
在以每分钟大约320条消息的速度运行了一夜之后,我注意到这个过程变得非常缓慢。经过一些分析之后,我发现kafka路由产生了内存泄漏(当然禁用了http路由)。
@Component
public class Router extends RouteBuilder {
@Autowired
ApplicationProperties param;
@Override
public void configure() throws Exception {
logger.info("Starting MqttKafkaBridgeApplication with:\n" + param.toString());
// MQTT Consumer
from("mqtt:vernemq?"
+ "host=tcp:MqttHost:MqttPort...")
.transform(body().convertToString())
.log("Recieved : "+body().convertToString())
.multicast()
.stopOnException().to( "direct:kafka");
// Kafka Producer
from("direct:kafka")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, 0);
exchange.getIn().setHeader(KafkaConstants.KEY, "1");
}
})
.to("kafka:" + "kafkaTopic" +
"?brokers=kafkaHost:KafkaPort;
}
}
我对camel还不熟悉,但据我所知,我的配置很简单?我可以看到消息到达Kafka集群,所以不知道为什么内存没有释放?
visualvm的屏幕截图,可以看到字节数和字符数组随着每条消息的增长而增长:
1条答案
按热度按时间nx7onnlm1#
经过进一步的调查,看起来camel使用的内存和你给它的一样多,只要gc运行,一切看起来都很好。我现在在docker容器中运行应用程序,没有任何重大更改,也没有任何与内存相关的问题。