你的位置:首页 > Java教程

[Java教程]NIO及Reactor模式


 

  • 关于Nio

  Java NIO即Java Non-blocking IO(Java非阻塞I/O),是Jdk1.4之后增加的一套操作I/O工具包,又被叫做Java New IO。

  • Nio要去解决的问题

  Nio要解决的问题网上的解释一大堆,诸如银行取号、餐厅点餐等等。这些列子就不再具体地重复了,实际上就是为了使用现有的资源提供更高的生产效率。

  这让我想起了以前学习政治的时候课本里的故事,资本家为了赚取更多的剩余价值往往会想方设法提高生产效率。如何提高呢?举个简单例子,一个汽车生产厂商有若干条生产线(一条生产线负责汽车制造的所有环节),每个生产线都有相同的工人数目,每个工人都负责一个生产环节,也就是说生产发动机和生产轮胎的工人数目是一样的,但是很明显生产发动机需要的时间肯定比轮胎要长很多,那么在每一条生产线上生产发动机的那个工人往往满负荷工作,而生产轮胎的工人却很闲,这样生产效率很低。因此厂家打破了这种一条生产线生产汽车所有环节的模式,改为一个汽车零部件一条生产线,那么在发动机生产线雇佣的工人数目一定多于轮胎生产线,这样每条生产线的工人都不会闲着,通过资源的合理分配最大化利用了工人的价值,提高了生产效率,赚取了剩余价值。

  而如何通过资源合理分配来提高生产效率就是nio在计算机io领域要解决的问题。

  • 同步/异步、阻塞/非阻塞

  同步和异步是针对应用程序和内核的交互而言的:

  同步指的是用户进程触发IO操作并等待或者轮询的去查看IO操作是否就绪;异步是指用户进程触发IO操作以后便开始做自己的事情,而当IO操作已经完成的时候会得到IO完成的通知。

 

  阻塞和非阻塞是针对于进程在访问数据的时候,根据IO操作的就绪状态来采取的不同方式,说白了是一种读取或者写入操作函数的实现方式:

  阻塞方式下读取或者写入函数将一直等待;非阻塞方式下,读取或者写入函数会立即返回一个状态值。

  • Reactor(反应器)模式:

  

  Reactor模式应用于同步IO场景,事件分离者等待某个事件的发生或者可操作状态的变化(比如文件描述符可读写,或者是socket可读写),事件分离者就把这个事件传给事先注册的事件处理函数或者回调函数,由后者来做实际的读写操作。

     nio中的reactor模式

 

 

  在Reactor模式中有如下几个角色:

  1. Reactor    

  事件分离者,其核心是Selector,负责响应IO事件,一旦发生,广播发送给相应的Handler去处理。具体为一个Selector和一个ServerSocketChannel。ServerSocketChannel注册到Selector中,获取的SelectionKey绑定一个Acceptor(也可以理解为一个Handler)。

  参考代码如下:

 1 import java.io.IOException; 2 import java.net.InetAddress; 3 import java.net.InetSocketAddress; 4 import java.nio.channels.SelectionKey; 5 import java.nio.channels.Selector; 6 import java.nio.channels.ServerSocketChannel; 7 import java.util.Iterator; 8 import java.util.Set; 9 10 /**11  * 反应器模式 用于解决多用户访问并发问题12 */13 public class Reactor implements Runnable {14   public final Selector selector;15   public final ServerSocketChannel serverSocketChannel;16 17   public Reactor(int port) throws IOException {18     selector = Selector.open();19     serverSocketChannel = ServerSocketChannel.open();20     InetSocketAddress inetSocketAddress = new InetSocketAddress(InetAddress.getLocalHost(), port);21     serverSocketChannel.socket().bind(inetSocketAddress);22     serverSocketChannel.configureBlocking(false);23 24     // 向selector注册该channel25     SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);26 27     // 利用selectionKey的attache功能绑定Acceptor 如果有事情,触发Acceptor28     selectionKey.attach(new Acceptor(this));29   }30 31   @Override32   public void run() {33     try {34       while (!Thread.interrupted()) {35         selector.select();36         Set<SelectionKey> selectionKeys = selector.selectedKeys();37         Iterator<SelectionKey> it = selectionKeys.iterator();38         // Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。39         while (it.hasNext()) {40           // 来一个事件 第一次触发一个accepter线程,SocketReadHandler41           SelectionKey selectionKey = it.next();42           dispatch(selectionKey);43           selectionKeys.clear();44         }45       }46     } catch (IOException e) {47       e.printStackTrace();48     }49   }50 51   /**52    * 运行Acceptor或SocketReadHandler53    * 54    * @param key55   */56   void dispatch(SelectionKey key) {57     Runnable r = (Runnable) (key.attachment());58     if (r != null) {59       r.run();60     }61   }62 63 }

View Code

  2. Acceptor

  也可以理解为一个Handler,这个Handler只负责创建具体处理IO请求的Handler,如果Reactor广播时SelectionKey创建一个Handler负责绑定相应的SocketChannel到Selector中。下次再次有IO事件时会调用对用的Handler去处理。

  参考代码如下:

 1 import java.io.IOException; 2 import java.nio.channels.SocketChannel; 3  4 public class Acceptor implements Runnable { 5   private Reactor reactor; 6  7   public Acceptor(Reactor reactor) { 8     this.reactor = reactor; 9   }10 11   @Override12   public void run() {13     try {14       SocketChannel socketChannel = reactor.serverSocketChannel.accept();15       if (socketChannel != null){16         // 调用Handler来处理channel17         new SocketReadHandler(reactor.selector, socketChannel);18       }        19     } catch (IOException e) {20       e.printStackTrace();21     }22   }23 }

View Code

  3. Handler

  具体的事件处理者,例如ReadHandler、SendHandler,ReadHandler负责读取缓存中的数据,然后再调用一个工作处理线程去处理读取到的数据。具体为一个SocketChannel,Acceptor初始化该Handler时会将SocketChannel注册到Reactor的Selector中,同时将SelectionKey绑定该Handler,这样下次就会调用本Handler。

  参考代码如下:

 1 import java.io.IOException;  2 import java.nio.ByteBuffer;  3 import java.nio.channels.SelectionKey;  4 import java.nio.channels.Selector;  5 import java.nio.channels.SocketChannel; 6  7 public class SocketReadHandler implements Runnable { 8   private SocketChannel socketChannel; 9 10   public SocketReadHandler(Selector selector, SocketChannel socketChannel) throws IOException {11     this.socketChannel = socketChannel;12     socketChannel.configureBlocking(false);13 14     SelectionKey selectionKey = socketChannel.register(selector, 0);15 16     // 将SelectionKey绑定为本Handler 下一步有事件触发时,将调用本类的run方法。17     // 参看dispatch(SelectionKey key)18     selectionKey.attach(this);19 20     // 同时将SelectionKey标记为可读,以便读取。21     selectionKey.interestOps(SelectionKey.OP_READ);22     selector.wakeup();23   }24 25   /**26    * 处理读取数据27   */28   @Override29   public void run() {30     ByteBuffer inputBuffer = ByteBuffer.allocate(1024);31     inputBuffer.clear();32     try {33       socketChannel.read(inputBuffer);34       // 激活线程池 处理这些request35       // requestHandle(new Request(socket,btt));36     } catch (IOException e) {37       e.printStackTrace();38     }39   }40 }

View Code

  综上所述,我们可以把上述角色的细化理解为将一份工作的工作量细化了,或者说重新分配了一下,专人做专事,这样最大化利用了每个角色的价值,提高工作效率。