你的位置:首页 > Java教程

[Java教程]Netty之引题


本文介绍Java BIO(同步阻塞IO),伪异步IO,NIO(非阻塞IO),AIO(异步IO)这四种IO的情况,并对不同IO模型作比较。

目录

1.BIO

2.伪异步IO

3.NIO

4.AIO

5.四种IO比较

6.BIO\伪异步IO\NIO\AIO源码下载

 

1.BIO

采用BIO通信模型的服务器,通常由一个独立的Acceptor线程负责监听客户端的连接,它接收到客户端连接请求后为每个客户端创建一个新的线程进程链路连接处理,处理完后,通过输出流返回应答给客户端,线程销毁。

该模型最大的问题性能问题,当客户端并发访问增加后,服务端线程增加,当线程数膨胀后,系统的性能下降,随着并发量增大,系统会发生线程堆栈溢出、创建新线程失败等问题,最终导致线程宕机或者僵死,不能对外提供服务。而且开线程有很大的开销,影响服务器性能。

源码在src/main/java/BIO下,分为客户端和服务端,简单的网络、线程的处理。

 

2.伪异步IO

为了解决同步阻塞IO面临的一个链接需要一个线程处理情况,现在引入了“池”的概念,加入了线程池。

当有新的客户端连接的时候,将客户端的Socket封装为Task(java的Runnable接口实现了)投递到后端线程池中进行处理。由于线程池可以设置消息队列的大小和最大线程数,因此它的资源是可控的,无论多少个客户端并发访问,都不会导致资源的耗尽和宕机。

伪异步IO通讯框架采用了线程池的实现,因此避免了为每个请求都创建一个独立的线程造成的线程资源耗尽问题。但是由于它的底层的通信依然采用的同步阻塞模型,因此无法从根本上解决问题。

java输出流InputStream:当对socket的输入流进行读操作时,它会一直阻塞下去,直到发生以下三种事件。

  • 有数据可读;
  • 可用数据已经读取完毕;
  • 发生空指针或者IO异常。

这意味着当对方发数据请求或者应答消息缓慢(网络传输慢)时,读取写入流一方的通讯线程将长时间阻塞,如果对方要100s才有消息发生完成,读取的一方的IO线程也会将同步阻塞100s,在此时间里,其他接入消息只能在消息队列中排队。

java输入流OutputStream:当调用OutputStream的write方法写输出流的时候,它将会呗阻塞,直到所有要发送的字节全部写入完毕,或者发生异常。搞过TCP/IP的都晓得,当消息的接收方处理缓慢的时候,将不能及时从TCP缓冲区读取数据,这将导致发送方的TCP window size不断减小,直到为0,双方处于keep-alive状态,消息发送方就不能再将TCP缓冲区写入数据,这时采用同步阻塞的IO,write操作将会无限期阻塞,直到tcp window size大于0或者发生IO异常。

源码在src/main/java/PseudoAsynchronousIO下,分为客户端和服务端。客户端和BIO的客户端一样,服务端加入了线程池ExecutorService,相关构造函数请读者自行查阅。

 

3.NIO

 NIO库,是在JDK1.4中引入的,NIO弥补了同步阻塞IO的不足。在所有的数据,NIO都是用缓冲区处理掉的(Buffer),任何时候访问NIO中的数据,都是通过缓冲区进行操作。缓冲区实际就是一个数组。Java NIO的基础是多路复用器Selector,简单来说,selector会不断的轮询注册在其上的Channel(通道,全双工的),如果某个Channel上有新的TCP连接接入、读写事件,这个Channel会处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪的select集合,进行后续的IO操作。

一个多路复用器可以同时轮询多个Channel,而且由于jdk使用了epoll替代了select实现,所以没有最大连接句柄的限制。(题外话,这里说的eopll、select是说的linux下的IO复用,和select、epoll一样,清楚流程概念请直接看源码)。

NIO服务端序列图

1.打开ServerSocketChannel,用于监听客户端的连接,它是所有客户端连接的父管道。

 ServerSocketChannel accptorSvr = ServerSocketChannel.open(); 

2.绑定监听端口,设置连接为非阻塞模式。

acceptorSvr.socket().bind(  new InetSocketAddress(InetAddress.getByName("IP"),port));acceptorSvr.configureBlocking(false);

3.创建Reactor线程,创建多路复用器并启动线程。

Selector selectot = Selector.open();new Thread(new RectorTask()).start();

4.将SelectSocketChannel注册到Reactor线程的多路复用器selector上,监听accept事件。

SelectionKey key = acceptorSvr.register(selector,SelectionKey.OP_ACCEPT,ioHandler);

5.多路复用器在线程run方法中无线循环里轮询准备就绪的key。

int num = selector.select();Set selectkeys = selector.selectedKeys();Iterator it = selectkeys.iterator();while(it.hasNext){   SelectionKey key = (SelectionKey)it.next;   /*   deal with IO event  */  }

 6.多路复用监听到有新的用户接入,处理新的接入请求,完成TCP三次握手,建立物理连接。

SocketChannel sc = ssc.accept();

7.设置客户端链路为非阻塞模式

sc.configureBlocking(false);sc.socket().setReuseAddress(true);...

8.将新接入的客户端连接注册到Reactor线程的多路复用器上,监听读操作,用来读取客户端发送的网络消息。

SelectionKey key = sc.register(selector,SelectionKey.OP_READ,ioHangler);

9.异步读取客户端请求消息到缓冲区

int readNumber = channel.read(receivedBuffer);

10.对bytebuffer进行编解码,如果有半包消息指针reset,继续读取后续的报文,将解码成功的消息封装成task,投递到业务线程池中,进行业务逻辑处理。

Object message = null;    while (buffer.hasRemain()){      byteBuffer.mark();      Object message = decode(byteBuffer);      if(message==null){        byteBuffer.reset();        break;      }      messageList.add(message);    }    if(!byteBuffer.hasRemain()){      byteBuffer.clear();    }    else byteBuffer.compact();    if(messageList!=null & !messageList.isEmpty()) {      for(Object messageF:messageList)        handleTask(messageE);    }

11.将pojo对象encode成bytebuffer,调用SocketChannel的异步write接口,将消息异步发送到客户端。

socketChannel.wite(buffer);

注意:如果发送区TCP缓冲区满了,会导致写半包,此时,需要注册写操作位,循环写,直到整个包消息写入TCP缓冲区。

 

NIO客户端序列图(大多数和服务端类似)

1.打开SocketChannel,绑定客户端本地地址(可选,默认系统会随机分配一个可用的本地地址)

 SocketChannel clientChannel = SocketChannel.open(); 

2.设置SocketChannel为非阻塞模式,同时设置连接的TCP参数。

SocketChannel.configureBlocking(false);
socket.setReuseAddress(true);
socket.setReceiveBufferSize(BUFFER_SIZE);
socket.setSendBufferSize(BUFFER_SIZE);

3.异步连接服务器。

boolean connected = clientChannel.connect(new InetSocketAdress("ip",port));

4.判断是否连接成功,如果成功,则直接注册读状态位到多路复用器中,如果没成功(异步连接,返回false,说明客户端已经已经发送sync包,服务端没有返回ack包,物理连接还没建立——关于ack、sync包,请读者自行查阅TCP/IP中的TCP的三次握手,四次分手的过程)

if(connect)
  clientChannel.register(selector,SelectionKey.OP_READ,ioHandler);
else
  clientChannel.register(selector,SelectionKey.OP_CONNECT,ioHandler);

5.向Reactor线程的多路复用器注册OP_CONNECT状态位,监听服务器的TCP ACK应答。

clientChannel.register(selector,SelectionKey.OP_CONNECT,ioHandler);

 6.创建Reactor线程,创建多路复用器并启动线程。

Selector selectot = Selector.open();new Thread(new RectorTask()).start();

7.多路复用器在线程run方法中无线循环里轮询准备就绪的key。

int num = selector.select();Set selectkeys = selector.selectedKeys();Iterator it = selectkeys.iterator();while(it.hasNext){   SelectionKey key = (SelectionKey)it.next;   /*   deal with IO event  */  }

8.接收connect事件进行处理

if(key.isConnectable())
  //handlerConnect();

9.判断连接结果,如果连接成功,注册读事件到多路复用器

if(channel.finishConnect())
  registerRead();

10.注册读事件到多路复用器

clientChannel.register(selector,SelectionKey.OP_READ,ioHandler);

11.异步读取客户端请求消息到缓冲区

int readNumber = channel.read(receivedBuffer);

12.对bytebuffer进行编解码,如果有半包消息指针reset,继续读取后续的报文,将解码成功的消息封装成task,投递到业务线程池中,进行业务逻辑处理。

Object message = null;    while (buffer.hasRemain()){      byteBuffer.mark();      Object message = decode(byteBuffer);      if(message==null){        byteBuffer.reset();        break;      }      messageList.add(message);    }    if(!byteBuffer.hasRemain()){      byteBuffer.clear();    }    else byteBuffer.compact();    if(messageList!=null & !messageList.isEmpty()) {      for(Object messageF:messageList)        handleTask(messageE);    }

13.将pojo对象encode成bytebuffer,调用SocketChannel的异步write接口,将消息异步发送到客户端。

socketChannel.wite(buffer);

 

注:以上的客户端和服务端过程,了解就行,上层的代码不一定这样写的,具体参考能运行的代码。

源码在src/main/java/NIO下,分为客户端和服务端。

4.AIO

 NIO2.0中引入了新的异步通道的概念,并提供了异步文件通道h额异步套接字通道的实现。

异步通道提供2种方式获取操作结果:

  • 通过java.util.concurrent.Futurn类来表示异步操作的结果;
  • 在执行异步操作的时候传入一个java.nio.channels.

CompletionHandler接口的实现类作为操作完成的回溯。

NIO2.0的异步套接字通道,对应UNIX网络编程中的事件驱动IO(AIO),它不需要通过多路复用器(Selector)对注册的通道进行轮询操作。

源码在src/main/java/AIO下,分为客户端和服务端。

 

5.四种IO比较

6.BIO\伪异步IO\NIO\AIO源码下载

GitHub地址:https://github.com/orange1438/Netty_Course