我有两个 Spring 启动服务。一个是stomp客户机,另一个是服务器。有一个为activemq代理设置的stomp中继。
一般来说,一切都按预期工作,但随着时间的推移,客户机服务中的内存使用量会增加,最终导致内存不足。
我吃了一堆垃圾。根据报告,我附上了疑似内存泄漏区域的片段。
我知道我在客户机/服务器中设置的最大缓冲区限制很高,但是消息的数量可能很大。请帮助我们理解什么可能是罪魁祸首。我已将配置粘贴到下面。以及我粘贴的堆转储报告。
websocket客户端配置
@Configuration
public class WebSocketConfig extends WebSocketMessagingAutoConfiguration {
@Value("${coMsgClient.config.websocket.server.host}")
private String host;
@Value("${coMsgClient.config.websocket.server.secureHost}")
private String secureHost;
@Value("${coMsgClient.config.websocket.server.enableSSL}")
private boolean enableSSL;
@Value("${coMsgClient.config.websocket.client.id}")
public String clientId;
@Value("${coMsgClient.config.websocket.buffer.msg.text}")
public int txtMsgBuffer;
@Value("${coMsgClient.config.websocket.buffer.msg.binary}")
public int binaryMsgBuffer;
@Value("${coMsgClient.config.websocket.session.timeout}")
public int wsSessionTimeout;
@Value("${coMsgClient.config.websocket.session.timeout.enabled}")
public boolean wsSessionTimeoutEnabled;
@Lazy
@Autowired
private StompSessionHandlerImpl stompSessionHandlerImpl;
@Autowired
@Lazy
private WebSocketStompClient webSocketStompClient;
@Retryable(maxAttempts = 720,
backoff = @Backoff(delay = 5000, multiplier = 1, random = true)) // retry every 5sec for an hour on loss of connection
public synchronized StompSession initSession(String orgId, String accessToken) throws ExecutionException, InterruptedException {
log.info("Client Session for {} to be initiated", orgId);
StompSession session = null;
final StompHeaders headers = new StompHeaders();
headers.add(CommonConstants.STOMP_HEADER_AUTHORIZATION, "Bearer ".concat(accessToken));
headers.add(CommonConstants.STOMP_HEADER_CLIENT_ID, clientId);
final WebSocketHttpHeaders httpheaders = new WebSocketHttpHeaders();
httpheaders.add(CommonConstants.STOMP_HEADER_AUTHORIZATION, "Bearer ".concat(accessToken));
session = enableSSL ?
webSocketStompClient.connect(URI.create(secureHost), httpheaders, headers, stompSessionHandlerImpl).get() :
webSocketStompClient.connect(URI.create(host), httpheaders, headers, stompSessionHandlerImpl).get();
log.info("Client Session for {} is initiated", orgId);
return session;
}
@Bean
public WebSocketStompClient stompClient() {
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
container.setDefaultMaxBinaryMessageBufferSize(50000000);
container.setDefaultMaxTextMessageBufferSize(50000000);
WebSocketClient simpleWebSocketClient = new StandardWebSocketClient(container);
List<Transport> transports = List.of(new WebSocketTransport(simpleWebSocketClient));
SockJsClient sockJsClient = new SockJsClient(transports);
WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, Boolean.FALSE);
converter.setObjectMapper(objectMapper);
stompClient.setMessageConverter(converter);
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(10000);
scheduler.initialize();
stompClient.setTaskScheduler(scheduler);
stompClient.setDefaultHeartbeat(new long[]{20000, 20000});
stompClient.setReceiptTimeLimit(Integer.MAX_VALUE);
stompClient.setInboundMessageSizeLimit(Integer.MAX_VALUE);
// ContainerProvider.getWebSocketContainer().setDefaultMaxTextMessageBufferSize(Integer.MAX_VALUE);
return stompClient;
}
}
websocket服务器配置
@Configuration
@EnableWebSocketMessageBroker
@ConfigurationProperties(prefix = "websocket.broker")
@Setter
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
private String host;
private String password;
private String user;
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/queue", "/topic", "/exchange")
.setRelayHost(host)
.setClientLogin(user)
.setClientPasscode(password)
.setSystemHeartbeatSendInterval(20000)
.setSystemLogin(user)
.setSystemPasscode(password)
.setUserDestinationBroadcast("/topic/unresolved-user")
.setUserRegistryBroadcast("/topic/log-user-registry");
config.setApplicationDestinationPrefixes("/device");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/websocket").setAllowedOrigins("*").withSockJS().setWebSocketEnabled(Boolean.TRUE);
registry.setErrorHandler(new StompSubProtocolErrorHandler());
}
@Bean
public ServletServerContainerFactoryBean createServletServerContainerFactoryBean() {
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
container.setMaxTextMessageBufferSize(1000000);
container.setMaxBinaryMessageBufferSize(1000000);
return container;
}
@Bean
public DefaultSimpUserRegistry getDefaultSimpRegistry() {
return new DefaultSimpUserRegistry();
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
registry.setMessageSizeLimit(Integer.MAX_VALUE);
registry.setSendBufferSizeLimit(Integer.MAX_VALUE);
registry.setTimeToFirstMessage(300000);
registry.setSendTimeLimit(300000);
registry.addDecoratorFactory(new WebSocketHandlerDecoratorFactory() {
@Override
public WebSocketHandler decorate(WebSocketHandler webSocketHandler) {
return new EmaWebSocketHandlerDecorator(webSocketHandler);
}
});
}
}
暂无答案!
目前还没有任何答案,快来回答吧!