Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.6) 之 WebSocketRoutingFilter

x33g5p2x  于2021-12-20 转载在 其他  
字(6.4k)|赞(0)|评价(0)|浏览(447)

1. 概述

本文主要分享 WebsocketRoutingFilter 的代码实现

WebsocketRoutingFilter ,Websocket 路由网关过滤器。其根据 ws:// / wss:// 前缀( Scheme )过滤处理,代理后端 Websocket 服务,提供给客户端连接。如下图 :

  • 目前一个 RouteDefinition 只能指定一个后端 WebSocket 服务。官方正在计划在 LoadBalancerClientFilter 上实现 Websocket 的负载均衡功能。也就说,未来一个 RouteDefinition 能够指定多个后端 WebSocket 服务。 

Websocket 的 RouteDefinition 配置如下 :

cloud:
    gateway:
      routes:
      - id: websocket_test
        uri: ws://localhost:9000
        order: 8000
        predicates:
        - Path=/echo

2. 环境搭建

在解析源码之前,我们先以 wscat 搭建一个 WebSocket 服务。

第一步,安装 wscat 。

npm install -g wscat

第二步,启动 wscat 。 

wscat --listen 9000

第三步,连接 wscat 。 

wscat --listen 9000

第四步,配置 RouteDefinition ,并启动 Spring Cloud Gateway 。 

cloud:
    gateway:
      routes:
      - id: websocket_test
        uri: ws://localhost:9000
        order: 8000
        predicates:
        - Path=/echo

第五步,通过 Gateway 连接 wscat 。 

wscat --connect ws://localhost:8080/echo

大功告成。

注意,wscat 同一时间仅允许一个客户端连接。

3. WebsocketRoutingFilter

org.springframework.cloud.gateway.filter.WebsocketRoutingFilter ,Websocket 路由网关过滤器。

构造方法,代码如下 :

public class WebsocketRoutingFilter implements GlobalFilter, Ordered {
	public static final String SEC_WEBSOCKET_PROTOCOL = "Sec-WebSocket-Protocol";

	private final WebSocketClient webSocketClient;
	private final WebSocketService webSocketService;

	public WebsocketRoutingFilter(WebSocketClient webSocketClient) {
		this(webSocketClient, new HandshakeWebSocketService());
	}

	public WebsocketRoutingFilter(WebSocketClient webSocketClient,
			WebSocketService webSocketService) {
		this.webSocketClient = webSocketClient;
		this.webSocketService = webSocketService;
	}
	
}

#getOrder() 方法,代码如下 :

@Override
public int getOrder() {
    return Ordered.LOWEST_PRECEDENCE;
}

#filter(ServerWebExchange, GatewayFilterChain) 方法,代码如下 :

1: @Override
 2: public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
 3: 	// 获得 requestUrl
 4: 	URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
 5: 
 6: 	// 判断是否能够处理
 7: 	String scheme = requestUrl.getScheme();
 8: 	if (isAlreadyRouted(exchange) || (!scheme.equals("ws") && !scheme.equals("wss"))) {
 9: 		return chain.filter(exchange);
10: 	}
11: 
12: 	// 设置已经路由
13: 	setAlreadyRouted(exchange);
14: 
15: 	// 处理连接请求
16: 	return this.webSocketService.handleRequest(exchange,
17: 			new ProxyWebSocketHandler(requestUrl, this.webSocketClient, exchange.getRequest().getHeaders()));
18: }
  • 第 4 行 :获得 requestUrl 。

  • 第 7 至 10 行 :判断 ForwardRoutingFilter 是否能够处理该请求,需要满足两个条件 :

  • ws:// 或者 wss:// 前缀( Scheme ) 。

  • 调用 ServerWebExchangeUtils#isAlreadyRouted(ServerWebExchange) 方法,判断该请求暂未被其他 Routing 网关处理。代码如下 :

public static boolean isAlreadyRouted(ServerWebExchange exchange) {
    return exchange.getAttributeOrDefault(GATEWAY_ALREADY_ROUTED_ATTR, false);
}
  • 第 13 行 :设置该请求已经被处理。代码如下 : 
public static void setAlreadyRouted(ServerWebExchange exchange) {
    exchange.getAttributes().put(GATEWAY_ALREADY_ROUTED_ATTR, true);
}
  • 第 15 至 16 行 :调用 WebSocketService#hanldeRequest(ServerWebExchange, WebSocketHandler) 方法,处理客户端发起的连接请求( Handshake Request ) 。这个方法的实现不在本文范围内,但是良心如笔者,大概讲下涉及到的类 :

  • 主要逻辑在 org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy 类里。

  • 【第一步】 ReactorNettyRequestUpgradeStrategy 调用 reactor.ipc.netty.http.server.HttpServerWSOperations ,处理客户端发起的连接请求。处理成功,告知客户端连接成功。

  • 【第二步】ReactorNettyRequestUpgradeStrategy 调用 org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy 接口的 #handle(WebSocketSession) 方法,处理客户端 WebSocket Session 。ProxyWebSocketHandler 是 WebSocketHandler 的实现类,在 「3.1 ProxyWebSocketHandler」 来详细解析 #handle(WebSocketSession) 实现了什么逻辑。

3.1 ProxyWebSocketHandler

org.springframework.cloud.gateway.filter.WebsocketRoutingFilter.ProxyWebSocketHandler ,代理后端 WebSocket 服务处理器。

构造方法,代码如下 :

1: private static class ProxyWebSocketHandler implements WebSocketHandler {
 2: 
 3: 	private final WebSocketClient client;
 4: 	private final URI url;
 5: 	private final HttpHeaders headers;
 6: 	private final List<String> subProtocols;
 7: 
 8: 	public ProxyWebSocketHandler(URI url, WebSocketClient client, HttpHeaders headers) {
 9: 		this.client = client;
10: 		this.url = url;
11: 		this.headers = new HttpHeaders();//headers;
12: 		//TODO: better strategy to filter these headers?
13: 		headers.entrySet().forEach(header -> {
14: 			if (!header.getKey().toLowerCase().startsWith("sec-websocket")
15: 					&& !header.getKey().equalsIgnoreCase("upgrade")
16: 					&& !header.getKey().equalsIgnoreCase("connection")) {
17: 				this.headers.addAll(header.getKey(), header.getValue());
18: 			}
19: 		});
20: 		List<String> protocols = headers.get(SEC_WEBSOCKET_PROTOCOL);
21: 		if (protocols != null) {
22: 			this.subProtocols = protocols;
23: 		} else {
24: 			this.subProtocols = Collections.emptyList();
25: 		}
26: 	}
27: }
  • client 属性,在 《Spring-Cloud-Gateway 源码解析 —— 网关初始化》「5.2 初始化 NettyConfiguration」 一文中,我们可以看到使用的是 org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient 实现类。通过该属性,连接后端【被代理】的 WebSocket 服务
  • url 属性,后端【被代理】的 WebSocket 服务的地址。
  • header 属性,请求头,在 《 【计网】HTTP与WebSocket的区别》 有详细解析,包括为什么【第 14 至 18 行】的代码这样处理。
  • subProtocols 属性,最终通信使用的协议。

#handle(WebSocketSession) 方法,代码如下 :

1: @Override
 2: public Mono<Void> handle(WebSocketSession session) {
 3: 	// pass headers along so custom headers can be sent through
 4: 	return client.execute(url, this.headers, new WebSocketHandler() {
 5: 		@Override
 6: 		public Mono<Void> handle(WebSocketSession proxySession) {
 7: 			// Use retain() for Reactor Netty
 8: 			// 转发消息 客户端 =》后端服务
 9: 			Mono<Void> proxySessionSend = proxySession
10: 					.send(session.receive().doOnNext(WebSocketMessage::retain));
11: 			// 转发消息 后端服务=》客户端
12: 			// .log("proxySessionSend", Level.FINE);
13: 			Mono<Void> serverSessionSend = session
14: 					.send(proxySession.receive().doOnNext(WebSocketMessage::retain));
15: 					// .log("sessionSend", Level.FINE);
16: 
17: 			// 
18: 			return Mono.when(proxySessionSend, serverSessionSend).then();
19: 		}
20: 
21: 		/**
22: 		 * Copy subProtocols so they are available downstream.
23: 		 * @return
24: 		 */
25: 		@Override
26: 		public List<String> getSubProtocols() {
27: 			return ProxyWebSocketHandler.this.subProtocols;
28: 		}
29: 	});
30: }
  • 第 6 行 :调用 WebSocketClient#execute(URI, HttpHeaders, WebSocketHandler) 方法,连接后端【被代理】的 WebSocket 服务。连接成功后,回调 WebSocketHandler 实现的内部类的 #handle(WebSocketSession) 方法。

  • WebSocketHandler 实现的内部类

  • 第 9 至 10 行 :转发消息,客户端 => 后端服务。

  • 第 13 至 14 行 :转发消息,后端服务 => 客户端。

  • 第 18 行 :调用 Mono#when() 方法,合并 proxySessionSend / serverSessionSend 两个 Mono 。调用 Mono#then() 方法,参数为空,合并的 Mono 不发射数据出来。RxJava 和 Reactor 类似,可以参考 《ReactiveX文档中文翻译 —— And/Then/When》 学习下 when / and / then 操作符。

  • 下图可以帮助理解下这个类的用途 :

相关文章