    网络应用程序一个很重要的工作是传输数据。传输数据的过程不一样取决是使用哪种交通工具,但是传输的方式是一样的:都是以字节码传输。Java开发网络程序传输数据的过程和方式是被抽象了的,我们不需要关注底层接口,只需要使用Java API或其他网络框架如Netty就能达到传输数据的目的。发送数据和接收数据都是字节码。Nothing more,nothing less。

4.1 案例研究:切换传输方式


4.1.1 使用Java的I/O和NIO


  1. package netty.in.action;
  2. import java.io.IOException;
  3. import java.io.OutputStream;
  4. import java.net.ServerSocket;
  5. import java.net.Socket;
  6. import java.nio.charset.Charset;
  7. /**
    • Blocking networking without Netty
    • @author c.k
  8. *
  9. */
  10. public class PlainOioServer {
  11. public void server(int port) throws Exception {
  12. //bind server to port
  13. final ServerSocket socket = new ServerSocket(port);
  14. try {
  15. while(true){
  16. //accept connection
  17. final Socket clientSocket = socket.accept();
  18. System.out.println("Accepted connection from " + clientSocket);
  19. //create new thread to handle connection
  20. new Thread(new Runnable() {
  21. @Override
  22. public void run() {
  23. OutputStream out;
  24. try{
  25. out = clientSocket.getOutputStream();
  26. //write message to connected client
  27. out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8")));
  28. out.flush();
  29. //close connection once message written and flushed
  30. clientSocket.close();
  31. }catch(IOException e){
  32. try {
  33. clientSocket.close();
  34. } catch (IOException e1) {
  35. e1.printStackTrace();
  36. }
  37. }
  38. }
  39. }).start();//start thread to begin handling
  40. }
  41. }catch(Exception e){
  42. e.printStackTrace();
  43. socket.close();
  44. }
  45. }
  46. }

上面的方式很简洁,但是这种阻塞模式在大连接数的情况就会有很严重的问题,如客户端连接超时,服务器响应严重延迟。为了解决这种情况,我们可以使用异步网络处理所有的并发连接,但问题在于NIO和OIO的API是完全不同的,所以一个用OIO开发的网络应用程序想要使用NIO重构代码几乎是重新开发。 下面代码是使用Java NIO实现的例子:

  1. package netty.in.action;
  2. import java.net.InetSocketAddress;
  3. import java.net.ServerSocket;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.SelectionKey;
  6. import java.nio.channels.Selector;
  7. import java.nio.channels.ServerSocketChannel;
  8. import java.nio.channels.SocketChannel;
  9. import java.util.Iterator;
  10. /**
    • Asynchronous networking without Netty
    • @author c.k
  11. *
  12. */
  13. public class PlainNioServer {
  14. public void server(int port) throws Exception {
  15. System.out.println("Listening for connections on port " + port);
  16. //open Selector that handles channels
  17. Selector selector = Selector.open();
  18. //open ServerSocketChannel
  19. ServerSocketChannel serverChannel = ServerSocketChannel.open();
  20. //get ServerSocket
  21. ServerSocket serverSocket = serverChannel.socket();
  22. //bind server to port
  23. serverSocket.bind(new InetSocketAddress(port));
  24. //set to non-blocking
  25. serverChannel.configureBlocking(false);
  26. //register ServerSocket to selector and specify that it is interested in new accepted clients
  27. serverChannel.register(selector, SelectionKey.OP_ACCEPT);
  28. final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
  29. while (true) {
  30. //Wait for new events that are ready for process. This will block until something happens
  31. int n = selector.select();
  32. if (n > 0) {
  33. //Obtain all SelectionKey instances that received events
  34. Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
  35. while (iter.hasNext()) {
  36. SelectionKey key = iter.next();
  37. iter.remove();
  38. try {
  39. //Check if event was because new client ready to get accepted
  40. if (key.isAcceptable()) {
  41. ServerSocketChannel server = (ServerSocketChannel) key.channel();
  42. SocketChannel client = server.accept();
  43. System.out.println("Accepted connection from " + client);
  44. client.configureBlocking(false);
  45. //Accept client and register it to selector
  46. client.register(selector, SelectionKey.OP_WRITE, msg.duplicate());
  47. }
  48. //Check if event was because socket is ready to write data
  49. if (key.isWritable()) {
  50. SocketChannel client = (SocketChannel) key.channel();
  51. ByteBuffer buff = (ByteBuffer) key.attachment();
  52. //write data to connected client
  53. while (buff.hasRemaining()) {
  54. if (client.write(buff) == 0) {
  55. break;
  56. }
  57. }
  58. client.close();//close client
  59. }
  60. } catch (Exception e) {
  61. key.cancel();
  62. key.channel().close();
  63. }
  64. }
  65. }
  66. }
  67. }
  68. }


4.1.2 Netty中使用I/O和NIO


  1. package netty.in.action;
  2. import java.net.InetSocketAddress;
  3. import io.netty.bootstrap.ServerBootstrap;
  4. import io.netty.buffer.ByteBuf;
  5. import io.netty.buffer.Unpooled;
  6. import io.netty.channel.Channel;
  7. import io.netty.channel.ChannelFuture;
  8. import io.netty.channel.ChannelFutureListener;
  9. import io.netty.channel.ChannelHandlerContext;
  10. import io.netty.channel.ChannelInboundHandlerAdapter;
  11. import io.netty.channel.ChannelInitializer;
  12. import io.netty.channel.EventLoopGroup;
  13. import io.netty.channel.nio.NioEventLoopGroup;
  14. import io.netty.channel.socket.oio.OioServerSocketChannel;
  15. import io.netty.util.CharsetUtil;
  16. public class NettyOioServer {
  17. public void server(int port) throws Exception {
  18. final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n", CharsetUtil.UTF_8));
  19. //事件循环组
  20. EventLoopGroup group = new NioEventLoopGroup();
  21. try {
  22. //用来引导服务器配置
  23. ServerBootstrap b = new ServerBootstrap();
  24. //使用OIO阻塞模式
  25. b.group(group).channel(OioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
  26. //指定ChannelInitializer初始化handlers
  27. .childHandler(new ChannelInitializer<Channel>() {
  28. @Override
  29. protected void initChannel(Channel ch) throws Exception {
  30. //添加一个“入站”handler到ChannelPipeline
  31. ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
  32. @Override
  33. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  34. //连接后,写消息到客户端,写完后便关闭连接
  35. ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);
  36. }
  37. });
  38. }
  39. });
  40. //绑定服务器接受连接
  41. ChannelFuture f = b.bind().sync();
  42. f.channel().closeFuture().sync();
  43. } catch (Exception e) {
  44. //释放所有资源
  45. group.shutdownGracefully();
  46. }
  47. }
  48. }


4.1.3 Netty中实现异步支持


  1. package netty.in.action;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.buffer.ByteBuf;
  4. import io.netty.buffer.Unpooled;
  5. import io.netty.channel.ChannelFuture;
  6. import io.netty.channel.ChannelFutureListener;
  7. import io.netty.channel.ChannelHandlerContext;
  8. import io.netty.channel.ChannelInboundHandlerAdapter;
  9. import io.netty.channel.ChannelInitializer;
  10. import io.netty.channel.EventLoopGroup;
  11. import io.netty.channel.nio.NioEventLoopGroup;
  12. import io.netty.channel.socket.SocketChannel;
  13. import io.netty.channel.socket.nio.NioServerSocketChannel;
  14. import io.netty.util.CharsetUtil;
  15. import java.net.InetSocketAddress;
  16. public class NettyNioServer {
  17. public void server(int port) throws Exception {
  18. final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n", CharsetUtil.UTF_8));
  19. // 事件循环组
  20. EventLoopGroup group = new NioEventLoopGroup();
  21. try {
  22. // 用来引导服务器配置
  23. ServerBootstrap b = new ServerBootstrap();
  24. // 使用NIO异步模式
  25. b.group(group).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
  26. // 指定ChannelInitializer初始化handlers
  27. .childHandler(new ChannelInitializer<SocketChannel>() {
  28. @Override
  29. protected void initChannel(SocketChannel ch) throws Exception {
  30. // 添加一个“入站”handler到ChannelPipeline
  31. ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
  32. @Override
  33. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  34. // 连接后,写消息到客户端,写完后便关闭连接
  35. ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);
  36. }
  37. });
  38. }
  39. });
  40. // 绑定服务器接受连接
  41. ChannelFuture f = b.bind().sync();
  42. f.channel().closeFuture().sync();
  43. } catch (Exception e) {
  44. // 释放所有资源
  45. group.shutdownGracefully();
  46. }
  47. }
  48. }


4.2 Transport API




  • 传输数据时,将数据从一种格式转换到另一种格式
  • 异常通知
  • Channel变为有效或无效时获得通知
  • Channel被注册或从EventLoop中注销时获得通知
  • 通知用户特定事件

  • eventLoop(),返回分配给Channel的EventLoop

  • pipeline(),返回分配给Channel的ChannelPipeline
  • isActive(),返回Channel是否激活,已激活说明与远程连接对等
  • localAddress(),返回已绑定的本地SocketAddress
  • remoteAddress(),返回已绑定的远程SocketAddress
  • write(),写数据到远程客户端,数据通过ChannelPipeline传输过去



  1. Channel channel = ...
  2. //Create ByteBuf that holds data to write
  3. ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8);
  4. //Write data
  5. ChannelFuture cf = channel.write(buf);
  6. //Add ChannelFutureListener to get notified after write completes
  7. cf.addListener(new ChannelFutureListener() {
  8. @Override
  9. public void operationComplete(ChannelFuture future) {
  10. //Write operation completes without error
  11. if (future.isSuccess()) {
  12. System.out.println(.Write successful.);
  13. } else {
  14. //Write operation completed but because of error
  15. System.err.println(.Write error.);
  16. future.cause().printStacktrace();
  17. }
  18. }
  19. });


  1. final Channel channel = ...
  2. //Create ByteBuf that holds data to write
  3. final ByteBuf buf = Unpooled.copiedBuffer("your data",CharsetUtil.UTF_8);
  4. //Create Runnable which writes data to channel
  5. Runnable writer = new Runnable() {
  6. @Override
  7. public void run() {
  8. channel.write(buf.duplicate());
  9. }
  10. };
  11. //Obtain reference to the Executor which uses threads to execute tasks
  12. Executor executor = Executors.newChachedThreadPool();
  13. // write in one thread
  14. //Hand over write task to executor for execution in thread
  15. executor.execute(writer);
  16. // write in another thread
  17. //Hand over another write task to executor for execution in thread
  18. executor.execute(writer);

此外,这种方法保证了写入的消息以相同的顺序通过写入它们的方法。想了解所有方法的使用可以参考Netty API文档。

4.3 Netty包含的传输实现


  • NIO,io.netty.channel.socket.nio,基于java.nio.channels的工具包,使用选择器作为基础的方法。
  • OIO,io.netty.channel.socket.oio,基于java.net的工具包,使用阻塞流。
  • Local,io.netty.channel.local,用来在虚拟机之间本地通信。
  • Embedded,io.netty.channel.embedded,嵌入传输,它允许在没有真正网络的运输中使用ChannelHandler,可以非常有用的来测试ChannelHandler的实现。

4.3.1 NIO - Nonblocking I/O

    NIO传输是目前最常用的方式,它通过使用选择器提供了完全异步的方式操作所有的I/O,NIO从Java 1.4才被提供。NIO中,我们可以注册一个通道或获得某个通道的改变的状态,通道状态有下面几种改变:
  • 一个新的Channel被接受并已准备好
  • Channel连接完成
  • Channel中有数据并已准备好读取
  • Channel发送数据出去

  • OP_ACCEPT,有新连接时得到通知

  • OP_CONNECT,连接完成后得到通知
  • OP_READ,准备好读取数据时得到通知
  • OP_WRITE,写入数据到通道时得到通知




4.3.2 OIO - Old blocking I/O



4.3.3 Local - In VM transport



4.3.4 Embedded transport

    Netty还包括嵌入传输,与之前讲述的其他传输实现比较,它是不是一个真的传输呢?若不是一个真的传输,我们用它可以做什么呢?Embedded transport允许更容易的使用不同的ChannelHandler之间的交互,这也更容易嵌入到其他的ChannelHandler实例并像一个辅助类一样使用它们。它一般用来测试特定的ChannelHandler实现,也可以在ChannelHandler中重新使用一些ChannelHandler来进行扩展,为了实现这样的目的,它自带了一个具体的Channel实现,即:EmbeddedChannel。

4.4 每种传输方式在什么时候使用?

  • OIO,在低连接数、需要低延迟时、阻塞时使用
  • NIO,在高连接数时使用
  • Local,在同一个JVM内通信时使用
  • Embedded,测试ChannelHandler时使用

