Netty系列之——WebSocket服务器

1. WebSocket原理

1.1 背景

在使用Netty实现我们自己的Websocket服务器之前,我们不妨来了解一下WebSocket的原理。如今,WebSocket的应用已经很广泛,从IM即时通信应用,如QQ和微信;再到Android上的消息推送,都可以用WebSocket来实现。

但是,Websocket是在2008年诞生,2011年才成为国际标准。你可能要问了,那么在WebSocket没出现之前,这些功能都是怎么实现的呢?对了!你可能首先想到的是轮询:

  • Ajax轮询:实现的方法很简单:就是让浏览器隔个几秒就发送一次请求,询问服务器是否有新信息
  • 长连接(long poll):和Ajax轮询差不多,不过采用的是阻塞的机制。也就是说:客户端发起连接后,如果当前服务器没消息,就一直不返回客户端响应。直到有消息后才返回,客户端收到消息后再进行长连接,周而复始。

上面的两种方式都是基于HTTP实现的,都体现出HTTP协议的一个特点:被动性,即客户端只能向服务器发出请求,服务器返回查询结果,无法做到服务器主动向客户端推送信息。但是,这两种方式都有各自的缺点:Ajax轮询依赖于服务器很好的处理性能;长连接需要有很高的并发。总之轮询的效率低,非常浪费资源。因此WebSocket就是这样出现的!

1.2 原理

首先,我们来看下典型的客户端Websocket协议升级的请求报文,可以看到采用的是标准的HTTP报文格式:

1
2
3
4
5
6
7
8
GET /ws HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==
Sec-WebSocket-Version: 13
Sec-WebSocket-Protocol: soap, wamp
Origin: http://example.com

如果你对普通的HTTP的请求报文的字段了解,可以发现多了一些字段,这是协议升级的核心部分

  • Connection: Upgrade:表示要进行升级协议
  • Upgrade: websocket:表示要升级为Websocket协议

以及其他的报文字段:

  • Sec-WebSocket-Key:Base64编码的值,提供基本的连接安全。
  • Sec-WebSocket-Version:表示升级的WebSocket版本。如果服务器端不支持该版本,将会返回支持的版本号。
  • Sec-WebSocket-Protocol:指定子协议,如果服务器不同意该子协议则可以不发送任何Sec-WebSocket-Protocol header

客户端将报文发送给服务器端,如果升级成功,服务器将会以101状态码响应报文:

1
2
3
4
HTTP/1.1 101 Switching Protocols
Connection:Upgrade
Upgrade: websocket
Sec-WebSocket-Accept: Oy4NRAQ13jhfONC7bP8dTKb4PTU=

这样就实现了Websocket的升级握手阶段,可以看出,使用WebSocket的应用程序始终以HTTP协议开始,然后再执行升级。这一点非常关键,接下来我们就来使用Netty来实现我们自己的WebSocket服务器。

2. Netty实现

2.1 服务端

2.1.1 处理协议升级

上面我们提到,WebSocket应用是需要通过HTTP协议开始执行升级协议的。所以我们首先创建HttpRequestHandler组件来处理协议转换转发的逻辑。

HttpRequestHandler中,通过构造函数传入WebSocket路径,例如/ws,也就是说如果客户端的HTTP请求是指向地址为/ws时,HttpRequestHandler将会传递给ChannelHandler来处理升级握手的逻辑(如1操作所示)。否则正常处理HTTP的请求逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* @author Pushy
* @since 2018/11/4 20:36
*/
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

private final String wsUri;

public HttpRequestHandler(String url) {
this.wsUri = url;
}

protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
// 1. 如果请求的URI是/ws,将它传递给下一个 ChannelHandler进行升级握手
if (wsUri.equalsIgnoreCase(request.uri())) {
ctx.fireChannelRead(request.retain());
} else {
// 2. 处理正常的HTTP请求
if (HttpUtil.is100ContinueExpected(request)) {
send100Continue(ctx);
}
String data = "Welcome to WebSocket server";
HttpResponse response = new DefaultHttpResponse(request.protocolVersion(), HttpResponseStatus.OK);
response.headers().set("Content-Type", "text/html; charset=UTF-8");
response.headers().set("Content-Length", data.length());
boolean keepAlive = HttpUtil.isKeepAlive(request);
if (keepAlive) { // 如果请求了keep-alive,则添加所需要的 HTTP 头信息
response.headers().set("Connection", "keep-alive");
}
ctx.write(response);
ctx.write(Unpooled.copiedBuffer(data, CharsetUtil.UTF_8));
ChannelFuture future = ctx.writeAndFlush( // 写 LastHttpContent 并冲刷至客户端
LastHttpContent.EMPTY_LAST_CONTENT);
// 如果没有请求keep-alive,则在写操作完成后关闭 Channel
if (!keepAlive) {
future.addListener(ChannelFutureListener.CLOSE);
}
}
}
}

2.1.2 处理WebSocket帧

Netty为RFC中定义WebSocket中6种帧都提供了相应的POJO实现:

操作码(RFC中定义) POJO实现 描述
%x0 TextWebSocketFrame 延续帧
%x1 TextWebSocketFrame 文本帧,包含了文本数据
%x2 BinaryWebSocketFrame 二进制帧,包含了二进制数据
%x3-7 保留的操作代码,用于后续定义的非控制帧
%x8 CloseWebSocketFrame 表示一个CLOSE请求
%x9 PingWebSocketFrame 请求传输一个PongWebSocketFrame
%xA PongWebSocketFrame 作为一个对于 PingWebSocketFrame 的响应被发送
%xB-F 保留的操作码,用于后续定义的控制帧

了解了WebSocket的帧和Netty提供的POJO实现后,我们创建一个SimpleChannelInboundHandler来处理文本帧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* @author Pushy
* @since 2018/11/4 21:15
*/
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

private final ChannelGroup group;

public TextWebSocketFrameHandler(ChannelGroup group) {
this.group = group;
}

/**
* 重写userEventTriggered方法以处理自定义事件
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
// 握手成功,从该ChannelHandler中移除HttpRequestHandler,因此将不会接受任何HTTP消息了
ctx.pipeline().remove(HttpRequestHandler.class);
// 向已连接的客户端发送消息通知成功连接
group.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel() + " joined"));
// 将新的WebSocket Channel添加到ChannelGroup中接受消息
group.add(ctx.channel());
} else {
super.userEventTriggered(ctx, evt);
}
}

/**
* 处理客户端发送的消息
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
// 将客户端发送的消息写到ChannelGroup中所有已经连接的客户端
group.writeAndFlush(msg.retain());
}
}

可以看到,我们在Netty中并不需要去处理WebSocket底层升级握手的逻辑,这是因为Netty在内部已经完成了这些功能。我们只需要在ChannelPipeline中添加一个WebSocketServerProtocolHandler来处理其他类型的帧即可。除此之外,我们还需要添加HTTP相关的解编码器、用于聚合HTTP消息的HttpObjectAggregator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* @author Pushy
* @since 2018/11/4 21:23
*/
public class WSServerInitializer extends ChannelInitializer<Channel> {

private final ChannelGroup group;

public WSServerInitializer(ChannelGroup group) {
this.group = group;
}

@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast(new HttpServerCodec()); // HTTP解编码器
pipeline.addLast(new HttpObjectAggregator(64 * 1024)); // 聚合HTTP消息
pipeline.addLast(new HttpRequestHandler("/ws")); // HTTP转发升级
pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); // 处理升级握手
pipeline.addLast(new TextWebSocketFrameHandler(group)); // 处理文本帧
}
}

2.1.3 引导服务器

离我们的WebSocket的服务器还差最后一步!最后,我们只需要创建ServerBootstrap来引导启动WebSocket服务器启动:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* @author Pushy
* @since 2018-11-5 21:38:09
*/
public class WebSocketServer {

private static final String host = "localhost";
private static final int port = 8080;

private static final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);

public static void main(String[] args) throws InterruptedException {

ServerBootstrap b = new ServerBootstrap();
b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new WSServerInitializer(channelGroup));

ChannelFuture f = b.bind(host, port).sync();

f.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
System.out.println("Running on http://" + host + ":" + port);
} else {
System.out.println("绑定失败!!!");
}
});

f.channel().closeFuture().sync();
}
}

这样我们就实现了最简单的WebSocket服务器,可以用Web前端测试一下,具体代码可以看index.html

1
2
3
4
5
let websocket = new WebSocket("ws://localhost:8080/ws");
websocket.onmessage = function (event) { // 监听接收服务器发送的消息
setMessageInnerHTML(`received => ${event.data}`);
};
websocket.send("Hello World"); // 发送消息

接下来打开浏览器访问http://localhost,可以发现当访问除/ws意外的任意URI时,服务器都会返回常规的HTTP响应:

2018-11-06_082957.png

而如果使用ws协议请求/wsURI时(如果是HTTP协议,服务器则会返回not a WebSocket handshake request: missing upgrade错误),服务器则会返回升级成功的响应:

upgrade.png

打开index.html,在客户端发送一条“Hello World”消息之后,服务端会发送相同的消息到客户端。但是注意,这和HTTP的双向通信是不一样的,因为服务器是在接收到消息之后向客户端主动发送消息的:

TIM截图20181105220617.png

2.1.4 实现点对点发送

前面在TextWebSocketFrameHandler中是通过ChannelGroup对象写入冲刷将消息发送到客户端的。我们可以打开两个客户端,在其中一个客户端向服务器发送消息之后,另一个客户端也能收到服务器发送给前者的消息:

TIM截图20181106084540.png

这也就说明,消息是广播的。但是如果我们想实现点对点发送,类似聊天系统的功能该如何实现呢?很简单,首先我们来看下ChannelGroup的数据结构:

1
2
3
4
5
6
7
// netty ChannelGroup.java部分源码
public interface ChannelGroup extends Set<Channel>, Comparable<ChannelGroup> {

Channel find(ChannelId id);

// ... other methods
}

可以看到ChannelGroup实现了Set接口,说明本质上就是个集合,存放了前面我们通过add方法添加的每个客户端的Channel对象。另外,ChannelGroup还提供find通过ChannelId可以拿到Channel实例。

知道了ChannelGroup的数据结构,那么要想实现消息的点对点发送就轻而易举了,我们修改TextWebSocketFrameHandler类的channelRead0方法:

1
2
3
4
5
6
7
8
9
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
// 广播发送
// group.writeAndFlush(msg.retain());

// 点对点发送
ChannelId id = ctx.channel().id(); // 获取当前连接客户端的ChannelId
group.find(id).writeAndFlush(new TextWebSocketFrame("Hello I'm webSocket server"));
}

现在消息是单独发送的了:

qeueue.png

2.2 客户端

用Netty来实现WebSocket也非常简单,因为Netty已经为内置了处理握手的WebSocketClientHandshaker,我们只需要写处理接收各个类型的帧逻辑即可。

因为Netty主要是一种服务端的技术,所以在这就不详细解析客户端的代码了。首先创建WebSocketClientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/**
* @author Pushy
* @since 2018-11-5 14:29:58
*/
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {

private final WebSocketClientHandshaker handShaker;
private ChannelPromise handshakeFuture;

public WebSocketClientHandler(WebSocketClientHandshaker handShaker) {
this.handShaker = handShaker;
}

public ChannelFuture handshakeFuture() {
return handshakeFuture;
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) {
handshakeFuture = ctx.newPromise();
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
// 执行握手
handShaker.handshake(ctx.channel());
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
System.out.println("WebSocket Client disconnected!");
}

@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
if (!handShaker.isHandshakeComplete()) {
try {
handShaker.finishHandshake(ch, (FullHttpResponse) msg);
System.out.println("WebSocket Client connected!");
handshakeFuture.setSuccess();
} catch (WebSocketHandshakeException e) {
System.out.println("WebSocket Client failed to connect");
handshakeFuture.setFailure(e);
}
return;
}

// 对于响应是HTTP类型则抛出异常
if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
throw new IllegalStateException(
"Unexpected FullHttpResponse (getStatus=" + response.status() +
", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
}

// 根据各个服务器返回的帧类型做出相应的处理
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
System.out.println("WebSocket Client received message: " + textFrame.text());
} else if (frame instanceof PongWebSocketFrame) {
System.out.println("WebSocket Client received pong");
} else if (frame instanceof CloseWebSocketFrame) {
System.out.println("WebSocket Client received closing");
ch.close();
}
}
}

然后引导该客户端启动:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* @author Pushy
* @since 2018-11-5 14:20:28
*/
public final class WebSocketClient {

private static final String URL = "ws://127.0.0.1:8080/ws";

public static void main(String[] args) throws Exception {
URI uri = new URI(URL);

EventLoopGroup group = new NioEventLoopGroup();
try {
final WebSocketClientHandler handler =
new WebSocketClientHandler(
WebSocketClientHandshakerFactory.newHandshaker(
// 指定协议版本
uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()));

Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(
new HttpClientCodec(),
new HttpObjectAggregator(8192),
WebSocketClientCompressionHandler.INSTANCE,
handler);
}
});
Channel ch = b.connect(uri.getHost(), uri.getPort()).sync().channel();
handler.handshakeFuture().sync();

} finally {
group.shutdownGracefully();
}
}
}

最后再添加从控制台读取用户输入的数据,并发送给客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// ...
handler.handshakeFuture().sync();

BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
while (true) {
String msg = console.readLine();
if (msg == null) {
break;
} else if ("bye".equals(msg.toLowerCase())) { // 发送CloseWebSocketFrame控制帧
ch.writeAndFlush(new CloseWebSocketFrame());
ch.closeFuture().sync();
break;
} else if ("ping".equals(msg.toLowerCase())) { // 发送PingWebSocketFrame控制帧
WebSocketFrame frame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[] { 8, 1, 8, 1 }));
ch.writeAndFlush(frame);
} else {
WebSocketFrame frame = new TextWebSocketFrame(msg);
ch.writeAndFlush(frame);
}
}

运行WebSocketClient,发送给服务器消息同样也能收到服务器发送的消息:

client.png

3. 心跳

在上图我们可以看到,我们还发送了一个PingWebSocketFrame的控制帧,而服务器则会回复一个PongWebSocketFrame控制帧。

PingsPongs,其实就是WebSockets的心跳

在经过握手之后的任意时刻里,无论客户端还是服务端都可以选择发送一个ping给另一方。 当ping消息收到的时候,接受的一方必须尽快回复一个pong消息。 例如,可以使用这种方式来确保客户端还是连接状态。

借助这个机制我们可以判断当前用户是否在线,如果是IM应用我们则可以缓存消息,客户端再次连接时我们再推送缓存的消息。

好了,我们创建基于Netty的最简单WebSocket服务器已经完成,如果你感兴趣,可以完善这个服务器,然后写出一个简单的聊天室的应用。

最后,代码已经上传到Github。祝你学习愉快!共勉!



参考资料

Mozilla——编写 WebSocket 服务器

Pushy wechat
欢迎订阅我的微信公众号