你的位置:首页 > 软件开发 > Java > Netty(一)——Netty入门程序

Netty(一)——Netty入门程序

发布时间:2017-11-09 16:00:41
转载请注明出处  前提:电脑上已经安装了JDK1.7并配置了JDK的环境变量path。  从Netty官网下载Netty最新安装包,解压。  这时会发现里面包含了各个模块的.jar包和源码,由于我们直接以二进制类库的方式使用Netty,所以只需要netty-all-4.1.15. ...

Netty(一)——Netty入门程序

转载请注明出处  前提:电脑上已经安装了JDK1.7并配置了JDK的环境变量path。

  从Netty官网下载Netty最新安装包,解压。

Netty(一)——Netty入门程序

  这时会发现里面包含了各个模块的.jar包和源码,由于我们直接以二进制类库的方式使用Netty,所以只需要netty-all-4.1.15.Final.jar即可。

Netty(一)——Netty入门程序

  新建Java工程,引入netty-all-4.1.15.Final.jar。

2.Netty服务端开发

  在使用Netty开发TimeServer之前,先回顾一下使用NIO进行服务端开发的步骤。

  1. 创建ServerSocketChannel,配置它为非阻塞模式;
  2. 绑定监听,配置TCP参数,例如backlog大小;
  3. 创建一个独立的I/O线程,用于轮询多路复用器Selector;
  4. 创建Selector,将之前创建的ServerSocketChannel注册到Selector上,监听SelectionKey.ACCEPT;
  5. 启动I/O线程,在循环体中执行Selector.select()方法,轮询就绪的Channel;
  6. 当轮询到了就绪状态的Channel时,需要对其进行判断,如果是OP_ACCEPT状态,说明是新的客户端接入,则调用ServerSocketChannel.accept()方法接受新的客户端;
  7. 设置新接入的客户端链路SocketChannel为非阻塞模式,配置其他的一些TCP参数;
  8. 将SocketChannel注册到Selector,监听OP_READ操作位;
  9. 如果轮询的Channel为OP_READ,则说明SocketChannel中有新的就绪的数据包需要读取,则构造ByteBuffer对象,读取数据包;
  10. 如果轮询的Channel为OP_WRITE,说明数据还没有发送完成,需要继续发送。

  一个简单的NIO服务端程序,如果需要我们直接使用JDK的NIO类库进行开发,竟然需要经过繁琐的十多步操作才能完成最基本的消息读取和发送,这也是我们选择Netty等NIO框架的原因了,下面我们看看使用Netty是如何轻松搞定服务端开发的。

package joanna.yan.netty;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;public class TimeServer {  public static void main(String[] args) throws Exception {  int port=9090;  if(args!=null&&args.length>0){   try {    port=Integer.valueOf(args[0]);   } catch (Exception e) {    // 采用默认值   }  }  new TimeServer().bind(port); }  public void bind(int port) throws Exception{  /*   * 配置服务端的NIO线程组,它包含了一组NIO线程,专门用于网络事件的处理,实际上它们就是Reactor线程组。   * 这里创建两个的原因:一个用于服务端接受客户端的连接,   * 另一个用于进行SocketChannel的网络读写。   */  EventLoopGroup bossGroup=new NioEventLoopGroup();  EventLoopGroup workerGroup=new NioEventLoopGroup();  try {   //ServerBootstrap对象,Netty用于启动NIO服务端的辅助启动类,目的是降低服务端的开发复杂度。   ServerBootstrap b=new ServerBootstrap();   b.group(bossGroup, workerGroup)    .channel(NioServerSocketChannel.class)    .option(ChannelOption.SO_BACKLOG, 1024)    /*    * 绑定I/O事件的处理类ChildChannelHandler,它的作用类似于Reactor模式中的handler类,    * 主要用于处理网络I/O事件,例如:记录日志、对消息进行编解码等。    */    .childHandler(new ChildChannelHandler());   /*    * 绑定端口,同步等待成功(调用它的bind方法绑定监听端口,随后,调用它的同步阻塞方法sync等待绑定操作完成。    * 完成之后Netty会返回一个ChannelFuture,它的功能类似于JDK的java.util.concurrent.Future,    * 主要用于异步操作的通知回调。)    */   ChannelFuture f=b.bind(port).sync();      //等待服务端监听端口关闭(使用f.channel().closeFuture().sync()方法进行阻塞,等待服务端链路关闭之后main函数才退出。)   f.channel().closeFuture().sync();  }finally{   //优雅退出,释放线程池资源   bossGroup.shutdownGracefully();   workerGroup.shutdownGracefully();  } }  private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{  @Override  protected void initChannel(SocketChannel arg0) throws Exception {   arg0.pipeline().addLast(new TimeServerHandler());  }   }}
package joanna.yan.netty;import java.sql.Date;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;/** * 用于对网络事件进行读写操作 * @author Joanna.Yan * @date 2017年11月8日下午4:15:13 *///public class TimeServerHandler extends ChannelHandlerAdapter{//已摒弃public class TimeServerHandler extends ChannelInboundHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg)   throws Exception {  //ByteBuf类似于JDK中的java.nio.ByteBuffer对象,不过它提供了更加强大和灵活的功能。  ByteBuf buf=(ByteBuf) msg;  byte[] req=new byte[buf.readableBytes()];  buf.readBytes(req);  String body=new String(req, "UTF-8");  System.out.println("The time server receive order : "+body);  String currentTime="QUERY TIME ORDER".equalsIgnoreCase(body) ? new     Date(System.currentTimeMillis()).toString() : "BAD ORDER";  ByteBuf resp=Unpooled.copiedBuffer(currentTime.getBytes());  ctx.write(resp); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {  /*   * ctx.flush();将消息发送队列中的消息写入到SocketChannel中发送给对方。   * 从性能角度考虑,为了防止频繁地唤醒Selector进行消息发送,Netty的write方法并不直接将消息写入SocketChannel中,   * 调用write方法只是把待发送的消息放到发送缓冲数组中,再通过调用flush方法,将发送缓冲区中的消息全部写到SocketChannel中。   */  ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)   throws Exception {  ctx.close(); } }

3.Netty客户端开发

package joanna.yan.netty;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;public class TimeClient { public static void main(String[] args) throws Exception {  int port=9019;  if(args!=null&&args.length>0){   try {    port=Integer.valueOf(args[0]);   } catch (Exception e) {    // 采用默认值   }  }  new TimeClient().connect(port, "127.0.0.1"); }  public void connect(int port,String host) throws Exception{  //配置客户端NIO线程组  EventLoopGroup group=new NioEventLoopGroup();    try {   Bootstrap b=new Bootstrap();   b.group(group)   .channel(NioSocketChannel.class)   .option(ChannelOption.TCP_NODELAY, true)   .handler(new ChannelInitializer<SocketChannel>() {    @Override    protected void initChannel(SocketChannel ch) throws Exception {     ch.pipeline().addLast(new TimeClientHandler());    }   });      //发起异步连接操作   ChannelFuture f=b.connect(host, port).sync();      //等待客户端链路关闭   f.channel().closeFuture().sync();  }finally{   //优雅退出,释放NIO线程组   group.shutdownGracefully();  } }}
package joanna.yan.netty;import java.util.logging.Logger;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.SimpleChannelInboundHandler;//public class TimeClientHandler extends ChannelHandlerAdapter{//已摒弃public class TimeClientHandler extends ChannelInboundHandlerAdapter{ private static final Logger logger=Logger.getLogger(TimeClientHandler.class.getName()); private final ByteBuf firstMessage;  public TimeClientHandler(){  byte[] req="QUERY TIME ORDER".getBytes();  firstMessage=Unpooled.buffer(req.length);  firstMessage.writeBytes(req); } /**  * 当客户端和服务端TCP链路建立成功之后,Netty的NIO线程会调用channelActive方法,  * 发送查询时间的指令给服务端。  */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {  //将请求信息发送给服务端  ctx.writeAndFlush(firstMessage); } /**  * 当服务端返回应答消息时调用channelRead方法  */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg)   throws Exception {  ByteBuf buf=(ByteBuf) msg;  byte[] req=new byte[buf.readableBytes()];  buf.readBytes(req);  String body=new String(req, "UTF-8");  System.out.println("Now is :"+body); }  /**  * 发生异常是,打印异常日志,释放客户端资源。  */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)   throws Exception {  //释放资源  logger.warning("Unexpected exception from downstream : "+cause.getMessage());  ctx.close(); } }

 ChannelInboundHandlerAdapter和SimpleChannelInboundHandler的使用区分:

   ChannelInboundHandlerAdapter是普通类,而SimpleChannelInboundHandler<T>是抽象类,继承SimpleChannelInboundHandler的类必须实现channelRead0方法;SimpleChannelInboundHandler<T>有一个重要特性,就是消息被读取后,会自动释放资源,常见的IM聊天软件的机制就类似这种。而且SimpleChannelInboundHandler类是继承了ChannelInboundHandlerAdapter类,重写了channelRead()方法,并新增抽象类。绝大部分场景都可以用ChannelInboundHandlerAdapter来处理。

4.运行与调试

  服务端运行结果:

Netty(一)——Netty入门程序

  客户端运行结果:

Netty(一)——Netty入门程序

 

  运行结果正确。可以发现,通过Netty开发的NIO服务端和客户端非常简单,短短几十行代码就能完成之前NIO程序需要几百行才能完成的功能。基于Netty的应用开发不但API使用简单、开发模式固定,而且扩展性和定制性非常好,后面,会通过更多应用来介绍Netty的强大功能。

  需要指出的是,本示例没有考虑读半包的处理,对于功能演示或者测试,上述程序没有问题,但是稍加改造进行性能或者压力测试,它就不能正确地工作了。后面我们会给出能够正确处理半包消息的应用实例。

Netty(二)——TCP粘包/拆包

 如果此文对您有帮助,微信打赏我一下吧~

Netty(一)——Netty入门程序

原标题:Netty(一)——Netty入门程序

关键词:net

net
*特别声明:以上内容来自于网络收集,著作权属原作者所有,如有侵权,请联系我们: admin#shaoqun.com (#换成@)。