你的位置:首页 > Java教程

[Java教程]Java NIO 基础


Java在JDK1.4中引入了 java.nio 类库,为Java进军后端Server和中间件开发打开了方便之门。

一般而言,这里的 nio 代表的是 New I/O,但是从实质上来说,我们可以将其理解成:NonBlocking I/O(非阻塞)。

java.nio 的核心的内容有:Buffer、Channel(SelectableChannel)、Selector。三者紧密配合,是实现异步非阻塞Server端开发的关键所在。任何想要从事Java后端Server和中间件开发的Java程序员,都应该深入的掌握Java NIO。下面分别介绍。

1. java.nio.Buffer

1.1 Buffer的本质

Buffer本质上是:一个容器(数组实现)。一个初始化之后,大小就固定了的,底层用数组实现的,存储原始类型的容器(byte、char、short、int、long、float、double)。每一个原始类型(boolean除外)对应一个子类,但是我们一般使用的都是 ByteBuffer,因为OS网络层最终传输的是字节byte:

也就是说用java.nio编写非阻塞网络程序,在通过 Channel 发送和接受网络字节,一般都是使用 ByteBuffer,作为容器(数组)来暂时存储的。

1.2 Buffer 的核心属性:

  // Invariants: mark <= position <= limit <= capacity  private int mark = -1;  private int position = 0;  private int limit;  private int capacity;

capacity: 描述底层数组的大小,初始化之后就一直不变。没有什么好说的。最重要的是 position 和 limit。

它们描述的都是一个数组内部的 index,也就是数组的一个位置或者数组元素。

position: 的值是下一个即将被 写/读 的数组元素的位置;随着向容易读/写操作的发生,position不断的递增;

limit: 的值是 数组元素 被 写/读 的最大界限。也就是第一个不能被读/写的数组元素位置;

position 的存在我们很好理解,那么为什么需要一个 limit 属性呢?

这是因为,Buffer 有时是 既能写,又能读的。比如我先向Buffer中写8个byte,然后我要读取这 8 个 byte,该怎么办呢?

第一步,很定需要 将 position 从 8 变成0,因为我们要从 0 的位置,开始读取;

第二步,从0开始读取,那么读到数组的那个位置呢?这个就是 limit 实现的功能,我们用 limit = 8 来描述读取操作的最大界限。

也就是 Buffer.flip() 函数实现的功能:

  public final Buffer flip() {    limit = position;    position = 0;    mark = -1;    return this;  }

可以看到,我们写了8个字节之后,position = 8 ,然后我们 调用 flip() 函数,那么 limit = 8, position=0; 所以后面的读取操作,就是从 0 读取到 8了。

所以实现了 从 Buffer 的写 ,到读取 Buffer 的转换。(可能熟悉Netty的知道,Netty中实现了一个 ByteBuf 的类对java.nio.ByteBuffer进行了简化)

mark: 该属性是为了实现,暂时 记住 目前的 position,然后我去进行一些 读/写 操作(position会发生变化),完成之后,我调用 reset() 函数,可以实现将positon变回前面记住的那个位置。

  public final Buffer mark() {    mark = position;    return this;  }  public final Buffer reset() {    int m = mark;    if (m < 0)      throw new InvalidMarkException();    position = m;    return this;  }

所以 mark 属性,mark() 函数,reset() 函数,它们是一组的,配合使用

1.3 区分 Buffer 的三个函数:

clear() 函数、flip() 函数、rewind()函数:

  public final Buffer flip() {    limit = position;    position = 0;    mark = -1;    return this;  }  public final Buffer rewind() {    position = 0;    mark = -1;    return this;  }  public final Buffer clear() {    position = 0;    limit = capacity;    mark = -1;    return this;  }

三个都将 position=0; 但是 flip() 和 clear() 同时还设置了 limit = postion; limit=capacity; 但 rewind() 没有修改 limit.

flip() 是为了实现Buffer的读/写转换的。rewind() 和 clear() 都是实现Buffer从 0 开始读/写,只不过一个修改了limit, 一个没有。

1.4 rewind() 与 compact()

  public ByteBuffer compact() {    System.arraycopy(hb, ix(position()), hb, ix(0), remaining());    position(remaining());    limit(capacity());    discardMark();    return this;  }

compact() 函数实现的是,比如我读取一个 Buffer, 但是没有读完,然后又要开始写,此时,我们可以将已经读取的抛弃掉,将占有的空间释放出来,将没有读完的移动到Buffer从 0 开始的数组位置上,也就是:System.arraycopy(hb, ix(position()), hb, ix(0), remaing()); 实现的copy功能。

remaiing() 返回还剩余的字节数目。position(remaining()) 重新定位,limit(capacity())重新设置;

显然这样做是 为了有更大的空间来容纳下一次读取到的内容。因为抛弃了已经读取过了的内容了,释放了其占有的空间。

1.5 ByteBuffer 的读(get)和写(put)

ByteBuffer的读和写可以分为相对位置的、绝对位置的两种;同时又可以分为单字节的、批量的读写

get() 函数就是相对位置的单字节读取,它先读取position位置的字节,然后 position++;

put(byte b) 函数是相对位置的写单字节,它先进行写操作,然后 position++;

get(int index) 是读取绝对位置 index 处的单字节,position不变;

put(int index, byte b) 是绝对位置 index 的写写但自己操作,position不变;

get(byte[] dst, int offset, int length) 是批量读取操作,注意是将 ByteBuffer 中的字节读取到 dst中,不是读取到Bytebuffer中;

  public ByteBuffer get(byte[] dst, int offset, int length) {    checkBounds(offset, length, dst.length);    if (length > remaining())      throw new BufferUnderflowException();    int end = offset + length;    for (int i = offset; i < end; i++)      dst[i] = get();    return this;  }

可以看到 参数 length 不能大于 remaining(),也就是 length 不能大于ByteBuffer中剩余字节数;所以一般如下使用:

int len = buffer.remaining();

buffer.get(dst,0, len);

get(byte[] dst) 等价于:get(dst, 0, dst.length) 

相对于批量写,也有对应的批量读:

put(byte[] src, int offset, int length)

一样的, length 不能大于 buffer 剩余字节数;

put(byte[] src) 等价于 put(src, 0, src.length)

对于写,还有一个 实现 两个 ByteBuffer 之间读写的put函数:

  public ByteBuffer put(ByteBuffer src) {    if (src == this)      throw new IllegalArgumentException();    if (isReadOnly())      throw new ReadOnlyBufferException();    int n = src.remaining();    if (n > remaining())      throw new BufferOverflowException();    for (int i = 0; i < n; i++)      put(src.get());    return this;  }

可以看到,源 ByteBuffer 的剩余的字节数,不能大于目的ByteBuffer的剩余空间,不然就溢出了。

1.6 ByteBuffer 的分配(存储空间初始化),堆内、堆外内存

ByteBuffer.allocate(1024);
ByteBuffer.allocateDirect(1024);

一个表示在JVM堆上进行分配内存空间,一个使用的是JVM堆外的内存空间:

  public static ByteBuffer allocate(int capacity) {    if (capacity < 0)      throw new IllegalArgumentException();    return new HeapByteBuffer(capacity, capacity);  }

  HeapByteBuffer(int cap, int lim) {    super(-1, 0, lim, cap, new byte[cap], 0);    /*    hb = new byte[cap];    offset = 0;    */  }

可以看到使用的是 HeapByteBuffer,而后者就是直接 new byte[cap]; 显然就是在JVM堆上分配的内存;

而直接分配:

  public static ByteBuffer allocateDirect(int capacity) {    return new DirectByteBuffer(capacity);  }
DirectByteBuffer(int cap) { // package-private super(-1, 0, cap, cap); boolean pa = VM.isDirectMemoryPageAligned(); int ps = Bits.pageSize(); long size = Math.max(1L, (long)cap + (pa ? ps : 0)); Bits.reserveMemory(size, cap); long base = 0; try { base = unsafe.allocateMemory(size); } catch (OutOfMemoryError x) { Bits.unreserveMemory(size, cap); throw x; } unsafe.setMemory(base, size, (byte) 0); if (pa && (base % ps != 0)) { // Round up to page boundary address = base + ps - (base & (ps - 1)); } else { address = base; } cleaner = Cleaner.create(this, new Deallocator(base, size, cap)); att = null; }

通过判断 对齐,计算出 正确的size,然后调用 unsafe.allocateMemory(size); 进行堆外分配内存,调用 unsafe.setMemory(base, size, (byte)0) 将内存清0.

而且 cleaner = Cleaner.create(this, new Deallocator(base, size, cap)); 

Cleaner 内部中通过一个列表,维护了一个针对每一个 directBuffer 的一个回收堆外内存的 线程对象(Runnable):

  private static class Deallocator implements Runnable  {    private static Unsafe unsafe = Unsafe.getUnsafe();    private long address;    private long size;    private int capacity;    private Deallocator(long address, long size, int capacity) {      assert (address != 0);      this.address = address;      this.size = size;      this.capacity = capacity;    }    public void run() {      if (address == 0) {        // Paranoia        return;      }      unsafe.freeMemory(address);      address = 0;      Bits.unreserveMemory(size, capacity);    }  }

回收操作发生在 Cleaner 的 clean() 方法中。

一般而且,allocate() 函数运行极快,因为JVM堆内存已经从OS中分配好了;而directAllocate() 分配要慢一些,因为需要从OS中直接分配;但是前者是JVM堆内内存,会受到GC的影响,而后者是堆外内存,不收GC的影响;所以后者适合于:事先分配好的,然后会对内存进行重复利用的,大型内存管理的需要;也就是大型Server端程序或者中间件的场景。

The contents of direct buffers may reside outside of the normal garbage-collected heap, and so their impact upon the memory footprint of an
application might not be obvious. It is therefore recommended that direct buffers be allocated primarily for large, long-lived buffers that
are subject to the underlying system's native I/O operations. In general it is best to allocate direct buffers only when they yield a
measureable gain in program performance.

在Mycat中间件中就使用到了直接分配的堆外内存,来处理网络字节。

另外 Bytebuffer.hasRemaining(); 和 Bytebuffer.remaining(); 函数可以判断:是否有空间可写 / 是否有字节可读;以及空间数量。

2. Channel(SelectableChannel)

Java实现非阻塞多路复用网络编程的第二个重要的组件就是 SelectableChannel,selectable 的意思是该 channel 可以注册到 Selector 对象上,从而实现多路复用。网络Server中,使用的最多的就是:SocketChannel、ServerSocketChannel、DatagramChannel(它们对应的阻塞网络编程类分别为:Socket、ServerSocket、DatagramSocket):

ByteBuffer 只是字节的容器,而字节的发送和接收都是通过 channel 来完成的。而 Selector 的作用就是对注册在其上的所有 channel 进行就绪通知

很显然SocketChannel、ServerSocketChannel分别用于TCP协议的client端和server端,而DatagramChannel用于udp协议。

2.1 SocketChannel

public abstract class SocketChannel extends AbstractSelectableChannel  implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel

SocketChannel extends AbstractSelectableChannel,所以它可以注册到 Selector 上;ByteChannel接口是可读可写的,也就是“双工的”;

SocketChannel(ServerSocketChannel) 有别于 Socket(ServerSocket) 的主要在于两点:SocketChannel 可以注册到 Selector上 ; SocketChannel 是可以配置成非阻塞的。有了这两点,就可以编写非阻塞多路复用网络程序,避免过去那种每个连接开一个线程的编程方式,省去大量线程上下文切换时的开销以及线程的相关开销

1)block 相关的API:

public final boolean isBlocking()public final Object blockingLock()

isBlocking() 测试 SocketChannel 是否是阻塞的;blockingLock() 作用是当调用了 configureBlocking() 之后,防止被修改。调用 blockingLock() 之后,只有持有返回值的线程能够修改SocketChannel的阻塞状态。

2)connect 相关的API:

boolean connect(SocketAddress remote) boolean isConnected() boolean isConnectionPending() boolean finishConnect() 

connect() 函数开始发起链接;

isConnected() 是否已经链接上了;

isConnectionPending() 连接正则进行中;

finishConnect() client 请求完成TCP连接过程;

3)如何建立到Server端的连接

关于 connect() 方法:

If this channel is in non-blocking mode then an invocation of this method initiates a non-blocking connection operation. If the connection is established immediately, as can happen with a local connection, then this method returns true. Otherwise this method returns false and the connection operation must later be completed by invoking the finishConnect method

如果是非阻塞模式,并且连接建立立即完成了,那么 connect() 方法返回true;否则 connect() 返回false,表示连接不能立即完成,需要后面再调用 finishConnect() 方法才能完成连接的建立过程

那么后面调用 finishConnect() 是在什么时候,哪里进行调用呢?

这里需要先介绍一下连接的流程一般是:

SocketChannel channel = SocketChannel.open();  // 获得一个实例

channel.configureBlocking(false);  // 配置非阻塞

channel.register(selector, SelectionKey.OP_CONNECT, att);  // 注册到selector,关注事件是:SelectionKey.OP_CONNECT

channel.connect(new InetSocketAddress("192.168.1.3", 3306));  // 发起连接

之后 selector 调用 select() 重新获取注册在其上的 channel 的ready状态,然后遍历其key set:

Set<SelectionKey> keys = selector.selectedKeys();

for (SelectionKey key : keys) {	if (key.isValid() && key.isConnectable()) {

        if (channel.isConnectionPending()) 
          try{

            channel.finishConnect();

          }catch(Exception e){

          // ....

          }         

          // ...

	    } else {		  key.cancel();	  }
   }}

这样才能完成 connect 连接的建立过程。

这里一定要搞清楚:SelectionKey.isConnectable() 

  public final boolean isConnectable() {    return (readyOps() & OP_CONNECT) != 0;  }

SelectionKey.OP_CONNECT = 8

Suppose that a selection key's interest set contains OP_CONNECT at the start of a selection operation. If the selector detects that the corresponding socket channel is ready to complete its connection sequence, or has an error pending, then it will add OP_CONNECT to the key's ready set and add the key to its selected-key set

在连接开始的阶段,如果 intereset set 包含了 OP_CONNECT, 那么如果 selector.select() 调用发现 channel 将要 完成,或者发生错误pending了,那么就会在ready set(readyOps()返回值) 中加入 OP_CONNECT. 所以上面才能那样调用。

finishConnect()方法

Finishes the process of connecting a socket channel.

A non-blocking connection operation is initiated by placing a socket channel in non-blocking mode and then invoking its connect method. Once the connection is established, or the attempt has failed, the socket channel will become connectable and this method may be invoked to complete the connection sequence. If the connection operation failed then invoking this method will cause an appropriate java.io.IOException to be thrown.

If this channel is already connected then this method will not block and will immediately return true. If this channel is in non-blocking mode then this method will return false if the connection process is not yet complete.

如果没有 连接没有完成,那么会返回 false,那么下一次 selector.select() 之后的 重新遍历 key set,会重复调用一次 finishConnect() 方法....

channel 对象可以保存在 att 对象中。然后通过 key 再获得 att 对象。

2.2 ServerSocketChannel

服务端的非阻塞处理过程:

selector = Selector.open();serverChannel = ServerSocketChannel.open();serverChannel.configureBlocking(false);
serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 1024 * 16 * 2);serverChannel.bind(new InetSocketAddress(bindIp, port), 100); serverChannel.register(selector, SelectionKey.OP_ACCEPT);

serverChannel 注册到 selector,关注的是 OP_ACCEPT事件,也就是当有 client SocketChannel连接上了的事件。

selector.select(1000L);Set<SelectionKey> keys = selector.selectedKeys();try {	for (SelectionKey key : keys) {		if (key.isValid() && key.isAcceptable()) {

          channel = serverChannel.accept();
          channel.configureBlocking(false);

          channel.register() ....

          // .... 连接过程完成

		} else {			key.cancel();		}	}} finally {    keys.clear();}

serverChannel 就在一个线程中不断的 循环上面的过程:

如果有 client SocketChannel 连接上了,那么就 accept() 返回一个 channel, 然后使用该 channel 和 client 的socketChannel 完成后续的网络字节读取和发送操作。

3. Selector

Selector 完成IO多路复用的关键。Selector 管理所有被注册到该selector上的 SocketChannel/ServerSocketChannel. 为它们提供就绪通知服务。每一次有任何一个channel关注的IO实际就绪了,那么selector.select()就更新其 key set, 并且冲阻塞中返回,然后我们就可以遍历key set,处理该IO事件。

和 Selector密切配合的是 SelectionKey对象。

 

3.1 selector相关API:

Selector.open()selector.select()selector.close()selector.keys()selector.selectedKeys()
selector.wakeup()

Selector.open() 初始化一个 selector实例。

select() 方法有多种形似

select() 会一直阻塞,直到有至少一个channel关注的事件就绪了。select(long timeout),超时也会返回。selectNow()立即返回。

这里一定要区分 keys() 和 selectedKeys() 方法

前者返回的是在该 selector 注册过的 channel 的 key 的集合;而后者是已经选择的 channel 的key 的集合,也就是其 channel 的关注的事件就绪了。

wakeup() 可以唤醒正处于阻塞的 select()方法,使其立即方法。

(有点虎头蛇尾的感觉,感觉越写越多...算了,就写到这里...)