11.3 实现
WebSocket使用HTTP升级机制从一个普通的HTTP连接WebSocket,因为这个应用程序使用WebSocket总是开始于HTTP(s),然后再升级。什么时候升级取决于应用程序本身。直接执行升级作为第一个操作一般是使用特定的url请求。
在这里,如果url的结尾以/ws结束,我们将只会升级到WebSocket,否则服务器将发送一个网页给客户端。升级后的连接将通过WebSocket传输所有数据。逻辑图如下:
11.3.1 处理http请求
服务器将作为一种混合式以允许同时处理http和websocket,所以服务器还需要html页面,html用来充当客户端角色,连接服务器并交互消息。因此,如果客户端不发送/ws的uri,我们需要写一个ChannelInboundHandler用来处理FullHttpRequest。看下面代码:
[java] view plaincopy
- package netty.in.action;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelFutureListener;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.DefaultFileRegion;
- import io.netty.channel.SimpleChannelInboundHandler;
- import io.netty.handler.codec.http.DefaultFullHttpResponse;
- import io.netty.handler.codec.http.DefaultHttpResponse;
- import io.netty.handler.codec.http.FullHttpRequest;
- import io.netty.handler.codec.http.FullHttpResponse;
- import io.netty.handler.codec.http.HttpHeaders;
- import io.netty.handler.codec.http.HttpResponse;
- import io.netty.handler.codec.http.HttpResponseStatus;
- import io.netty.handler.codec.http.HttpVersion;
- import io.netty.handler.codec.http.LastHttpContent;
- import io.netty.handler.ssl.SslHandler;
- import io.netty.handler.stream.ChunkedNioFile;
- import java.io.RandomAccessFile;
- /**
- WebSocket,处理http请求
- @author c.k
- */
- public class HttpRequestHandler extends
- SimpleChannelInboundHandler<FullHttpRequest> {
- //websocket标识
- private final String wsUri;
- public HttpRequestHandler(String wsUri) {
- this.wsUri = wsUri;
- }
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg)
- throws Exception {
- //如果是websocket请求,请求地址uri等于wsuri
- if (wsUri.equalsIgnoreCase(msg.getUri())) {
- //将消息转发到下一个ChannelHandler
- ctx.fireChannelRead(msg.retain());
- } else {//如果不是websocket请求
- if (HttpHeaders.is100ContinueExpected(msg)) {
- //如果HTTP请求头部包含Expect: 100-continue,
- //则响应请求
- FullHttpResponse response = new DefaultFullHttpResponse(
- HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
- ctx.writeAndFlush(response);
- }
- //获取index.html的内容响应给客户端
- RandomAccessFile file = new RandomAccessFile(
- System.getProperty("user.dir") + "/index.html", "r");
- HttpResponse response = new DefaultHttpResponse(
- msg.getProtocolVersion(), HttpResponseStatus.OK);
- response.headers().set(HttpHeaders.Names.CONTENT_TYPE,
- "text/html; charset=UTF-8");
- boolean keepAlive = HttpHeaders.isKeepAlive(msg);
- //如果http请求保持活跃,设置http请求头部信息
- //并响应请求
- if (keepAlive) {
- response.headers().set(HttpHeaders.Names.CONTENT_LENGTH,
- file.length());
- response.headers().set(HttpHeaders.Names.CONNECTION,
- HttpHeaders.Values.KEEP_ALIVE);
- }
- ctx.write(response);
- //如果不是https请求,将index.html内容写入通道
- if (ctx.pipeline().get(SslHandler.class) == null) {
- ctx.write(new DefaultFileRegion(file.getChannel(), 0, file
- .length()));
- } else {
- ctx.write(new ChunkedNioFile(file.getChannel()));
- }
- //标识响应内容结束并刷新通道
- ChannelFuture future = ctx
- .writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
- if (!keepAlive) {
- //如果http请求不活跃,关闭http连接
- future.addListener(ChannelFutureListener.CLOSE);
- }
- file.close();
- }
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
- throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
- }
11.3.2 处理WebSocket框架
WebSocket支持6种不同框架,如下图:
我们的程序只需要使用下面4个框架:
- CloseWebSocketFrame
- PingWebSocketFrame
- PongWebSocketFrame
TextWebSocketFrame
我们只需要显示处理TextWebSocketFrame,其他的会自动由WebSocketServerProtocolHandler处理,看下面代码:
[java] view plaincopy
- package netty.in.action;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- import io.netty.channel.group.ChannelGroup;
- import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
- import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
- /**
- WebSocket,处理消息
- @author c.k
- *
- */
- public class TextWebSocketFrameHandler extends
- SimpleChannelInboundHandler<TextWebSocketFrame> {
- private final ChannelGroup group;
- public TextWebSocketFrameHandler(ChannelGroup group) {
- this.group = group;
- }
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
- throws Exception {
- //如果WebSocket握手完成
- if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
- //删除ChannelPipeline中的HttpRequestHandler
- ctx.pipeline().remove(HttpRequestHandler.class);
- //写一个消息到ChannelGroup
- group.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel()
- " joined"));
- //将Channel添加到ChannelGroup
- group.add(ctx.channel());
- }else {
- super.userEventTriggered(ctx, evt);
- }
- }
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg)
- throws Exception {
- //将接收的消息通过ChannelGroup转发到所以已连接的客户端
- group.writeAndFlush(msg.retain());
- }
- }
11.3.3 初始化ChannelPipeline
看下面代码:
[java] view plaincopy
- package netty.in.action;
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelPipeline;
- import io.netty.channel.group.ChannelGroup;
- import io.netty.handler.codec.http.HttpObjectAggregator;
- import io.netty.handler.codec.http.HttpServerCodec;
- import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
- import io.netty.handler.stream.ChunkedWriteHandler;
- /**
- WebSocket,初始化ChannelHandler
- @author c.k
- *
- */
- public class ChatServerInitializer extends ChannelInitializer<Channel> {
- private final ChannelGroup group;
- public ChatServerInitializer(ChannelGroup group){
- this.group = group;
- }
- @Override
- protected void initChannel(Channel ch) throws Exception {
- ChannelPipeline pipeline = ch.pipeline();
- //编解码http请求
- pipeline.addLast(new HttpServerCodec());
- //写文件内容
- pipeline.addLast(new ChunkedWriteHandler());
- //聚合解码HttpRequest/HttpContent/LastHttpContent到FullHttpRequest
- //保证接收的Http请求的完整性
- pipeline.addLast(new HttpObjectAggregator(64 * 1024));
- //处理FullHttpRequest
- pipeline.addLast(new HttpRequestHandler("/ws"));
- //处理其他的WebSocketFrame
- pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
- //处理TextWebSocketFrame
- pipeline.addLast(new TextWebSocketFrameHandler(group));
- }
}
WebSocketServerProtcolHandler不仅处理Ping/Pong/CloseWebSocketFrame,还和它自己握手并帮助升级WebSocket。这是执行完成握手和成功修改ChannelPipeline,并且添加需要的编码器/解码器和删除不需要的ChannelHandler。
看下图:
ChannelPipeline通过ChannelInitializer的initChannel(...)方法完成初始化,完成握手后就会更改事情。一旦这样做了,WebSocketServerProtocolHandler将取代HttpRequestDecoder、WebSocketFrameDecoder13和HttpResponseEncoder、WebSocketFrameEncoder13。另外也要删除所有不需要的ChannelHandler已获得最佳性能。这些都是HttpObjectAggregator和HttpRequestHandler。下图显示ChannelPipeline握手完成:
我们甚至没注意到它,因为它是在底层执行的。以非常灵活的方式动态更新ChannelPipeline让单独的任务在不同的ChannelHandler中实现。