你的位置:首页 > Java教程

[Java教程]事件驱动的Reactor


Reactor,真是一个不好理解的名字。

关于Reactor与Preactor,网络中有大量的文章。Mina中采用了Reactor方式处理NIO。那到底什么是Reactor呢?

         Reactor是一个事件驱动的模型。先不说什么是事件驱动的设计模型,先来看看事件处理机制。

事件处理

1、常用事件处理模型

 事件处理机制,大家应该都不陌生。

 

结构很简单,事件源拥有多个listener,当有事件时,只需要通知每个Listener做相应处理就OK了。

void handle(Event event){  for(Listener listener: listeners){    listener.doHandle(event);  }}

 

在Spring的事件模型中,会稍微的做了个改动: 

 

 

事件发布,其实只是将上面代码中的for 循环控制拿到了EventPublish#publishEvent中了而已。

 

 

2、事件驱动的编程模型

  对于常用的事件处理模型,不管它怎么变,怎么玩,都是针对一个事件源与事件监听者(事件处理者)之间的编程。

     如果要让你来管理一堆的事件源,及其事件处理呢?我们都会想到,先为每个事件对象注册上事件及其处理。当 事件发生时,根据调度来处理就行了。例如AWT、Swing、以及JavaScript中都是这样一个机制。

     如果事件源源不断的发生呢?事先我们根本无法知道有多少事件源出现,有多少事件发生。最觉的当是TCP连接过程中的事件了。

     

 

这个图的意思是:

         客户访问服务器,由acceptor接收事件,接收到的事件经过Reactor调度进行处理。

这是一个单线程的处理模型。

 

对这一个模型实现如下:

package com.fjn.java.nio.reactor.sample;import java.util.Queue;import java.util.concurrent.LinkedBlockingDeque;public class EventCenter {  public static final String EVENT_READ = "read";  public static final String EVENT_WRITE = "write";  Queue<Event> eventQueue = new LinkedBlockingDeque<EventCenter.Event>(10);  protected void startupAcceptEvents() {        // Acceptor 线程,不断的接收事件,并将事件注册,放到eventQueue中    new Thread(new Runnable() {      @Override      public void run() {        try {          for (;;) {            Thread.sleep(500);            // 接收到一个事件            Source source = new Source();            registerHandler(source, EVENT_READ);          }        } catch (InterruptedException | InstantiationException            | IllegalAccessException e) {          e.printStackTrace();        }      }    }, "Acceptor").start();  }  protected void startupDispatch() throws InterruptedException {        // Dispatcher 线程,从eventQueue中取出事件,然后由Reactor进行调度    new Thread(new Runnable() {      @Override      public void run() {        Event event = null;        for (;;) {          event = eventQueue.poll();          if (event != null) {            System.out.println(Thread.currentThread().getName()                + ":\thandle a " + event.type + " event");            try {              dispatch(event);            } catch (Exception e) {              e.printStackTrace();            }          }          try {            Thread.sleep(500);          } catch (InterruptedException e) {            e.printStackTrace();          }        }      }    }, "Processor").start();  }  protected void dispatch(Event event) throws Exception {    event.source.handler.handle();  }  public void registerHandler(Source source, String eventType)      throws InstantiationException, IllegalAccessException {    // 动态注册事件处理    EventHandler handler = null;    switch (eventType) {    case EVENT_READ:      handler = EventCenter.this.new ReadEventHandler();      break;    case EVENT_WRITE:      handler = EventCenter.this.new WriteEventHandler();      break;    default:      break;    }    handler.source = source;    source.handler = handler;    Event newEvent = new Event();    newEvent.type = eventType;    newEvent.source = source;    eventQueue.offer(newEvent);    System.out.println(Thread.currentThread().getName()        + ":\tregiester a eventType event ");  }  public static void main(String[] args) throws InterruptedException {    EventCenter center = new EventCenter();    center.startupAcceptEvents();    center.startupDispatch();    Thread.sleep(3 * 60 * 1000);  }  public static abstract class EventHandler {    public Source source;    public abstract void handle() throws Exception;  }  public static class Source {    private EventHandler handler;  }  public static class Event {    public String type;    public Source source;  }  public class ReadEventHandler extends EventHandler {    @Override    public void handle() throws Exception {      System.out.println("reading ...");      System.out.println("computeing...");      EventCenter.this.registerHandler(source, EVENT_WRITE);    }  }  public class WriteEventHandler extends EventHandler {    @Override    public void handle() {      System.out.println("begin writing ...");    }  }}

 


          上面的实现中使用了Queue来暂存event。由两个线程Acceptor、Dispatcher分别用于接收事件、调度事件处理。

 执行结果:

Acceptor:	regiester a eventType event Processor:	handle a read eventreading ...computeing...Processor:	regiester a eventType event Acceptor:	regiester a eventType event Processor:	handle a write eventbegin writing ...Acceptor:	regiester a eventType event Processor:	handle a read eventreading ...computeing...Processor:	regiester a eventType event Acceptor:	regiester a eventType event Processor:	handle a read eventreading ...computeing...Processor:	regiester a eventType event Acceptor:	regiester a eventType event Processor:	handle a write eventbegin writing ...Acceptor:	regiester a eventType event Processor:	handle a read eventreading ...computeing...Processor:	regiester a eventType event Acceptor:	regiester a eventType event Processor:	handle a write eventbegin writing ...Acceptor:	regiester a eventType event 

View Code

 

 

3、使用事件驱动的编程模型来处理NIO

 

在NIO编程中,由ServerSocketChannel接收到一个SocketChannel后,会将其注册为OP_READ,当需要进行写操作时,又会将其注册为OP_WRITE。

         在OP_READ、OPWRITE时,对SocketChannel的处理又不一样。所以在对NIO的处理过程中,可以使用事件驱动方式。如果将SocketChannel看作是事件源(Source)。将OP_READ、OP_WRITE看作是事件(Event),将EventCenter看作是一个Reactor。

 

那么类比上面的写法,处理NIO请求就写成了:

package com.fjn.java.nio.reactor.nioReactor;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.CharBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.charset.Charset;import java.util.Iterator;public class Reactor {  Selector selector;  ServerSocketChannel serverSocket;  public void init() throws IOException {    serverSocket = ServerSocketChannel.open();    serverSocket.bind(new InetSocketAddress(8080));    serverSocket.configureBlocking(false);    selector = Selector.open();  }  private void startupAcceptEvents() throws IOException, InterruptedException {    new Thread(new Acceptor()).start();  }  private void startupDispatch() {    new Thread(new Dispatcher()).start();  }  private void registerHandler(SocketChannel socket, int operates)      throws IOException {    socket.configureBlocking(false);    SelectionKey key = socket.register(selector, operates);    Handler handler = new Handler();    handler.key = key;    handler.socket = socket;    key.attach(handler);  }  private void dispatch(SelectionKey key) {    Runnable handler = (Runnable) key.attachment();    if (handler != null) {      handler.run();    }  }  public static void main(String[] args) throws IOException,      InterruptedException {    Reactor reactor = new Reactor();    reactor.init();    reactor.startupAcceptEvents();    reactor.startupDispatch();  }  public class Handler implements Runnable {    SocketChannel socket;    SelectionKey key;    @Override    public void run() {      if (socket != null && key != null) {        if (key.isReadable()) {          System.out.println("reading ...");          try {            ByteBuffer buffer = ByteBuffer.allocate(2048);            socket.read(buffer);            buffer.flip();            CharBuffer charBuffer = Charset.forName("UTF-8")                .newDecoder().decode(buffer);            String request = charBuffer.toString();            System.out.println(request);            System.out.println("computeing...");            Reactor.this.registerHandler(socket,                SelectionKey.OP_WRITE);          } catch (IOException e) {            e.printStackTrace();          }        } else if (key.isWritable()) {          System.out.println("write");          ByteBuffer buffer = ByteBuffer.allocate(1024);          buffer.put("ok".getBytes());          buffer.flip();          try {            int operate = key.interestOps();            operate &= ~SelectionKey.OP_WRITE;            key.interestOps(operate);            socket.write(buffer);            socket.close();          } catch (IOException e) {            e.printStackTrace();            key.cancel();          }        }      }    }  }  public class Dispatcher implements Runnable {    @Override    public void run() {      for (;;) {        try {          int count = selector.select(500);          if (count > 0) {            Iterator<SelectionKey> iter = selector.selectedKeys()                .iterator();            while (iter.hasNext()) {              SelectionKey key = iter.next();              dispatch(key);              iter.remove();            }            selector.selectedKeys().clear();          }        } catch (IOException e) {          e.printStackTrace();        }      }    }  }  public class Acceptor implements Runnable {    @Override    public void run() {      for (;;) {        SocketChannel socket;        try {          socket = serverSocket.accept();          if (socket != null) {            registerHandler(socket, SelectionKey.OP_READ);          }          Thread.sleep(100);        } catch (Exception e) {          e.printStackTrace();        }      }    }  }}

 

然后使用浏览器作为客户端进行测试,结果如下:

服务端:

 

客户端:

 

 

上面这就是基于事件处理模型在NIO编程中的实现。