你的位置:首页 > Java教程

[Java教程]MINA 网络黏包处理代码


本文完整代码,可以浏览:

https://github.com/hjj2017/xgame-code_server/blob/master/game_server/src/com/game/gameServer/framework/mina/MsgCumulativeFilter.java

 

我在网上查阅过的 MINA 黏包处理,一般都是放在 Decoder 中做的。也就是黏包处理和消息解码放在一起做,显得比较混乱不好打理。而以下这段代码,我是把黏包处理放在 Filter 中了。在具体使用时可以这样:

 1 // 创建 IO 接收器 2 NioSocketAcceptor acceptor = new NioSocketAcceptor(); 3  4 // 获取责任链 5 DefaultIoFilterChainBuilder chain = acceptor.getFilterChain(); 6 // 处理网络粘包 7 chain.addLast("msgCumulative", new MsgCumulativeFilter()); 8  9 // 添加自定义编解码器10 chain.addLast("msgCodec", new ProtocolCodecFilter(11   new XxxEncoder(),12   new XxxDecoder()13 ));14 15 // 获取会话配置16 IoSessionConfig cfg = acceptor.getSessionConfig();17 18 // 设置缓冲区大小19 cfg.setReadBufferSize(4096);20 // 设置 session 空闲时间21 cfg.setIdleTime(IdleStatus.BOTH_IDLE, 10);22 23 // 设置 IO 句柄24 acceptor.setHandler(new XxxHandler());25 acceptor.setReuseAddress(true);26 27 try {28   // 绑定端口29   acceptor.bind(new InetSocketAddress("127.0.0.1", 4400));30 } catch (Exception ex) {31   // 输出错误日志32   System.error.println(ex);33 }

目前 Netty 框架要比 MINA 流行的多,而且 Netty 对网络黏包处理也做了很好的处理,不用开发者自己费那么大劲。我也考虑过迁移到 Netty 框架上,不过目前还没有找到特别充分的理由。闲话不多说了,以下就是黏包处理代码:

 1 package com.game.gameServer.framework.mina; 2  3 import java.util.concurrent.ConcurrentHashMap; 4  5 import org.apache.mina.core.buffer.IoBuffer; 6 import org.apache.mina.core.filterchain.IoFilterAdapter; 7 import org.apache.mina.core.session.IoSession; 8  9 import com.game.gameServer.framework.FrameworkLog; 10 import com.game.gameServer.msg.SpecialMsgSerialUId; 11 import com.game.part.msg.IoBuffUtil; 12  13 /** 14  * 消息粘包处理 15  *  16  * @author hjj2017 17  * @since 2014/3/17 18  *  19 */ 20 class MsgCumulativeFilter extends IoFilterAdapter { 21   /**  22    * 从客户端接收的消息估计长度, 23    * {@value} 字节,  24    * 对于从客户端接收的数据来说, 都是简单的命令!  25    * 很少超过 {@value}B 26    *  27   */ 28   private static final int DECODE_MSG_LEN = 64; 29   /** 容器 Buff 字典 */ 30   private static final ConcurrentHashMap<Long, IoBuffer> _containerBuffMap = new ConcurrentHashMap<>(); 31  32   @Override 33   public void sessionClosed(NextFilter nextFilter, IoSession sessionObj) throws Exception { 34     if (nextFilter == null ||  35       sessionObj == null) { 36       // 如果参数对象为空,  37       // 则直接退出! 38       FrameworkLog.LOG.error("null nextFilter or sessionObj"); 39       return; 40     } 41  42     // 移除容器 Buff 43     removeContainerBuff(sessionObj); 44     // 向下传递 45     super.sessionClosed(nextFilter, sessionObj); 46   } 47  48   @Override 49   public void messageReceived( 50     NextFilter nextFilter, IoSession sessionObj, Object msgObj) throws Exception { 51     if (nextFilter == null ||  52       sessionObj == null) { 53       // 如果参数对象为空,  54       // 则直接退出! 55       FrameworkLog.LOG.error("null nextFilter or sessionObj"); 56       return; 57     } 58  59     // 获取会话 UId 60     long sessionUId = sessionObj.getId(); 61  62     if (!(msgObj instanceof IoBuffer)) { 63       // 如果消息对象不是 ByteBuff,  64       // 则直接向下传递! 65       FrameworkLog.LOG.warn("msgObj is not a IoBuff, sessionUId = " + sessionUId); 66       super.messageReceived(nextFilter, sessionObj, msgObj); 67     } 68  69     // 获取输入 Buff 70     IoBuffer inBuff = (IoBuffer)msgObj; 71  72     if (!inBuff.hasRemaining()) { 73       // 如果没有剩余内容,  74       // 则直接退出! 75       FrameworkLog.LOG.error("inBuff has not remaining, sessionUId = " + sessionUId); 76       return; 77     } else if (inBuff.remaining() <= 8) { 78       // 如果 <= 8 字节,  79       // 那还是执行粘包处理过程吧 ... 80       // 8 字节 = 消息长度 ( Short ) + 消息类型 ( Short ) + 时间戳 ( Int ) 81       // 如果比这个长度都小,  82       // 那肯定不是一条完整消息 ... 83       this.msgRecv_0(nextFilter, sessionObj, inBuff); 84       return; 85     } 86  87     // 获取消息长度 88     final int msgSize = inBuff.getShort(); 89     inBuff.position(0); 90  91     if (msgSize == inBuff.limit() &&  92       containerBuffIsEmpty(sessionObj)) { 93       //  94       // 如果消息长度和极限值刚好相同,  95       // 并且容器 Buff 中没有任何内容 ( 即, 上一次消息没有粘包 ), 96       // 那么直接向下传递! 97       //  98       super.messageReceived( 99         nextFilter, sessionObj, inBuff100       );101     } else {102       // 103       // 如果消息长度和极限值不同, 104       // 则说明是网络粘包!105       // 这时候跳转到粘包处理过程 ...106       // 107       this.msgRecv_0(nextFilter, sessionObj, inBuff);108     }109   }110 111   /**112    * 接收连包消息113    * 114    * @param nextFilter115    * @param sessionObj116    * @param inBuff117    * @throws Exception 118    * 119   */120   private void msgRecv_0(121     NextFilter nextFilter, IoSession sessionObj, IoBuffer inBuff) throws Exception {122     if (nextFilter == null || 123       sessionObj == null) {124       // 如果参数对象为空, 125       // 则直接退出!126       FrameworkLog.LOG.error("null nextFilter or sessionObj");127       return;128     }129 130     // 获取会话 UId131     long sessionUId = sessionObj.getId();132     // 获取容器 Buff133     IoBuffer containerBuff = getContainerBuff(sessionObj);134 135     // 添加新 Buff 到容器 Buff 的末尾136     IoBuffUtil.append(containerBuff, inBuff);137     // 令 position = 0138     containerBuff.position(0);139 140 //    // 记录调试信息141 //    FrameworkLog.LOG.debug("\nin = [ " + inBuff.getHexDump() + " ]");142 143     for (int i = 0; ; i++) {144 //      // 记录调试信息145 //      FrameworkLog.LOG.debug(146 //        "i = " + i 147 //        + "\nco = [ " + containerBuff.getHexDump() + " ]"148 //        + "\nco.pos = " + containerBuff.position() 149 //        + "\nco.lim = " + containerBuff.limit()150 //      );151 152       if (containerBuff.remaining() < 4) {153         // 154         // 如果剩余字节数 < 4, 155         // 这样根本无法识别出消息类型 msgSerialUId ...156         // 直接退出!157         // 在退出前, 158         // 准备好接收下一次消息!159         // 160         IoBuffUtil.readyToNext(containerBuff);161         return;162       }163 164       // 获取原始位置165       final int oldPos = containerBuff.position();166       // 获取消息长度和类型167       final int msgSize = containerBuff.getShort();168       final int msgSerialUId = containerBuff.getShort();169 170 //      // 记录调试信息171 //      FrameworkLog.LOG.debug(172 //        "i = " + i 173 //        + "\nmsgSize = " + msgSize174 //        + "\nmsgSerialUId = " + msgSerialUId175 //      );176 177       // 还原原始位置178       containerBuff.position(oldPos);179 180       if (msgSerialUId == SpecialMsgSerialUId.CG_FLASH_POLICY || 181         msgSerialUId == SpecialMsgSerialUId.CG_QQ_TGW) {182         // 183         // 如果是 Flash 安全策略消息, 184         // 或者是腾讯网关消息, 185         // 则尝试找一下 0 字节的位置 ...186         // 187         int pos0 = IoBuffUtil.indexOf(containerBuff, (byte)0);188 189         if (pos0 <= -1) {190           // 如果找不到 0 字节的位置, 191           // 则说明消息还没接收完, 192           // 准备接受下次消息并直接退出!193           IoBuffUtil.readyToNext(containerBuff);194           return;195         }196 197         // 复制 Buff 内容198         containerBuff.position(0);199         IoBuffer realBuff = IoBuffUtil.copy(containerBuff, pos0);200 201         // 更新 Buff 位置202         final int newPos = containerBuff.position() + pos0;203         containerBuff.position(newPos);204         // 压缩容器 Buff205         IoBuffUtil.compact(containerBuff);206 207         // 向下传递208         super.messageReceived(209           nextFilter, sessionObj, realBuff210         );211         continue;212       }213 214       if (msgSize <= 0) {215         // 216         // 如果消息长度 <= 0, 217         // 则直接退出!218         // 这种情况可能是消息已经乱套了 ...219         // 还是重新来过吧!220         // 221         FrameworkLog.LOG.error("i = " + i + ", msgSize = " + msgSize + ", sessionUId = " + sessionUId);222         // 将容器 Buff 内容清空223         containerBuff.position(0);224         containerBuff.flip();225         // 压缩容器 Buff226         IoBuffUtil.compact(containerBuff);227         return;228       }229 230       if (containerBuff.remaining() < msgSize) {231         // 232         // 如果消息长度不够, 233         // 则可能是出现网络粘包情况了 ...234         // 直接退出就可以了!235         // 236         FrameworkLog.LOG.warn(237           "i = " + i238           + ", msgSize = " + msgSize 239           + ", containerBuff.remaining = " + containerBuff.remaining()240           + ", sessionUId = " + sessionUId241         );242 243         // 准备接受下一次消息244         IoBuffUtil.readyToNext(containerBuff);245         return;246       }247 248       // 创建新 Buff 并复制字节内容249       IoBuffer realBuff = IoBuffUtil.copy(containerBuff, msgSize);250 251       if (realBuff == null) {252         // 253         // 如果真实的 Buff 为空, 254         // 则直接退出!255         // 这种情况可能也是消息乱套了 ...256         // 记录一下错误信息257         // 258         FrameworkLog.LOG.error("i = " + i + ", null realBuff, sessionUId = " + sessionUId);259       } else {260 //        // 记录调试信息261 //        FrameworkLog.LOG.debug(262 //          "i = " + i263 //          + "\nreal = [ " + realBuff.getHexDump() + " ]"264 //          + "\nreal.pos = " + realBuff.position()265 //          + "\nreal.lim = " + realBuff.limit()266 //        );267 268         // 向下传递269         super.messageReceived(270           nextFilter, sessionObj, realBuff271         );272       }273 274       // 更新位置275       containerBuff.position(containerBuff.position() + msgSize);276       // 压缩容器 Buff277       IoBuffUtil.compact(containerBuff);278     }279   }280   281   /**282    * 获取玩家的 Buff, 如果为空则新建一个!283    * 284    * @param sessionObj285    * @return 286    * 287   */288   private static IoBuffer getContainerBuff(IoSession sessionObj) {289     if (sessionObj == null) {290       // 如果参数对象为空, 291       // 则直接退出!292       return null;293     }294 295     // 获取会话 UId296     long sessionUId = sessionObj.getId();297     // 获取容器 Buff298     IoBuffer containerBuff = _containerBuffMap.get(sessionUId);299 300     if (containerBuff == null) {301       // 创建缓存 Buff302       containerBuff = IoBuffer.allocate(DECODE_MSG_LEN);303       containerBuff.setAutoExpand(true);304       containerBuff.setAutoShrink(true);305       containerBuff.position(0);306       containerBuff.flip();307       // 缓存 Buff 对象308       Object oldVal = _containerBuffMap.putIfAbsent(sessionUId, containerBuff);309 310       if (oldVal != null) {311         FrameworkLog.LOG.warn("exists oldVal");312       }313     }314 315     return containerBuff;316   }317 318   /**319    * 移除容器 Buff320    * 321    * @param sessionObj322    * 323   */324   private static void removeContainerBuff(IoSession sessionObj) {325     if (sessionObj == null) {326       // 如果参数对象为空, 327       // 则直接退出!328       return;329     }330 331     // 获取会话 UId332     long sessionUId = sessionObj.getId();333     // 获取容器 Buff334     IoBuffer containerBuff = _containerBuffMap.get(sessionUId);335 336     if (containerBuff != null) {337       // 是否所占资源338       containerBuff.clear();339     }340 341     // 移除玩家的 Buff 对象342     _containerBuffMap.remove(sessionUId);343   }344 345   /**346    * 容器 Buff 为空 ?347    * 348    * @param sessionObj349    * @return 350    * 351   */352   private static boolean containerBuffIsEmpty(IoSession sessionObj) {353     if (sessionObj == null) {354       // 如果参数对象为空, 355       // 则直接退出!356       return false;357     }358 359     // 获取容器 Buff360     IoBuffer containerBuff = getContainerBuff(sessionObj);361 362     if (containerBuff == null) {363       // 如果容器为空, 364       // 则直接退出!365       FrameworkLog.LOG.error("null containerBuff, sessionUId = " + sessionObj.getId());366       return false;367     } else {368       // 如果当前位置和极限值都为 0, 369       // 则判定为空!370       return (containerBuff.position() == 0 371         && containerBuff.limit() == 0);372     }373   }374 }