你的位置:首页 > ASP.net教程

[ASP.net教程]java netty socket库和自定义C#socket库利用protobuf进行通信完整实例


之前的文章讲述了socket通信的一些基本知识,已经本人自定义的C#版本的socket、和java netty 库的二次封装,但是没有真正的发表测试用例。

本文只是为了讲解利用protobuf 进行C# 和 java的通信。以及完整的实例代码

java 代码 svn 地址,本人开发工具是NetBeans 8.0.2 使用 maven 项目编译

http://code.taobao.org/svn/flynetwork_csharp/trunk/BlogTest

c# 代码 svn 地址 使用的是 vs 2013 .net 4.5

http://code.taobao.org/svn/flynetwork_csharp/trunk/Flynetwork/BlogTest

 

本文着重以C# socket作为服务器端,java netty作为socket的客户端进行访问通信

首先附上proto的message文件

package Sz.Test.ProtoMessage;//登陆消息message TestMessage {  //消息枚举  enum Proto_Login {    ResTip             = 101201;//服务器推送提示    ReqLogin            = 101102;//客户端申请登陆    ReqChat            = 101103;//客户端申请聊天消息    ResChat            = 101203;//服务器推送聊天消息      }  //服务器推送提示 ResTip  message ResTipMessage {    required string msg           = 1;//提示内容  }  //客户端申请登陆 ReqLogin  message ReqLoginMessage {    required string userName        = 1;//登陆用户名    required string userPwd         = 2;//登陆密码  }   //客户端申请登陆 ReqChat  message ReqChatMessage {    required string msg           = 1;//提示内容  }  //客户端申请登陆 ResChat  message ResChatMessage {    required string msg           = 1;//提示内容  }}

本人编译工具自带生产消息,和对应的handler

先把proto文件编译生产后,放到哪里,然后创建服务器监听代码

上一篇文章讲到由于java和C#默认网络端绪不一样,java是标准端绪大端序,C#使用的小端序。

1       MarshalEndian.JN = MarshalEndian.JavaOrNet.Java;2       Sz.Network.SocketPool.ListenersBox.Instance.SetParams(new MessagePool(), typeof(MarshalEndian));3       Sz.Network.SocketPool.ListenersBox.Instance.Start("tcp:*:9527");

所以在我开启服务器监听的时候设置解码器和编码器的解析风格为java

然后建立一个文件chat文件夹用于存放handler文件就是刚才工具生成 目录下的 ExcelSource\protobuf\net\Handler

这一系列文件

 1 if (message.MsgID == (int)Sz.Test.ProtoMessage.TestMessage.Proto_Login.ReqLogin) 2       { 3         //构建消息 4         Sz.Test.ProtoMessage.TestMessage.ReqLoginMessage loginmessage = new Test.ProtoMessage.TestMessage.ReqLoginMessage(); 5         object msg = DeSerialize(message.MsgBuffer, loginmessage); 6         //构建handler 7         Test.ProtoMessage.ReqLoginHandler handler = new Test.ProtoMessage.ReqLoginHandler(); 8         handler.Session = client; 9         handler.Message = loginmessage;10         //把handler交给 登录 线程处理11         ThreadManager.Instance.AddTask(ServerManager.LoginThreadID, handler);12       }13       else if (message.MsgID == (int)Sz.Test.ProtoMessage.TestMessage.Proto_Login.ReqChat)14       {15         //构建消息16         Sz.Test.ProtoMessage.TestMessage.ReqChatMessage loginmessage = new Test.ProtoMessage.TestMessage.ReqChatMessage();17         object msg = DeSerialize(message.MsgBuffer, loginmessage);18         //构建handler19         Test.ProtoMessage.ReqChatHandler handler = new Test.ProtoMessage.ReqChatHandler();20         handler.Session = client;21         handler.Message = loginmessage;22         //把handler交给 聊天 线程处理23         ThreadManager.Instance.AddTask(ServerManager.ChatThreadID, handler);24       }

收到消息后的处理判断传入的消息id是什么类型的,然后对应反序列化byte[]数组为消息

最后把消息和生成handler移交到对应的线程处理

登录的消息全部交给 LoginThread 线程 去处理 ,这样在真实的运行环境下,能保证单点登录问题;

聊天消息全部交给 ChatThread 线程 去处理 这样的好处是,聊天与登录无关;

收到登录消息的处理

 1   public class ReqLoginHandler : TcpHandler 2   { 3     public override void Run() 4     { 5        6       var message = (Sz.Test.ProtoMessage.TestMessage.ReqLoginMessage)this.Message; 7       Sz.Test.ProtoMessage.TestMessage.ResTipMessage tip = new TestMessage.ResTipMessage(); 8       if (message.userName == "admin" && message.userPwd == "admin") 9       {10         Logger.Debug("收到登录消息 登录完成"); 11         tip.msg = "登录完成";12       }13       else14       {15         Logger.Debug("收到登录消息 用户名或者密码错误"); 16         tip.msg = "用户名或者密码错误";17       }18       byte[] buffer = MessagePool.Serialize(tip);19       this.Session.SendMsg(new Network.SocketPool.SocketMessage((int)Sz.Test.ProtoMessage.TestMessage.Proto_Login.ResTip, buffer));20     }21   }

收到聊天消息的处理

 1   public class ReqChatHandler : TcpHandler 2   { 3     public override void Run() 4     { 5       var message = (Sz.Test.ProtoMessage.TestMessage.ReqChatMessage)this.Message; 6       Logger.Debug("收到来自客户端聊天消息:" + message.msg); 7       Sz.Test.ProtoMessage.TestMessage.ResChatMessage chat = new TestMessage.ResChatMessage(); 8       chat.msg = "服务器广播:" + message.msg; 9       byte[] buffer = MessagePool.Serialize(chat);10       this.Session.SendMsg(new Network.SocketPool.SocketMessage((int)Sz.Test.ProtoMessage.TestMessage.Proto_Login.ResChat, buffer));11     }12   }

 

接下来我们构建 

java版本基于netty 二次封装的socket客户端

 1 package sz.network.socketpool.nettypool; 2  3 import Sz.Test.ProtoMessage.Test.TestMessage; 4 import com.google.protobuf.InvalidProtocolBufferException; 5 import io.netty.channel.ChannelHandlerContext; 6 import java.io.BufferedReader; 7 import java.io.IOException; 8 import java.io.InputStreamReader; 9 import java.util.logging.Level;10 import org.apache.log4j.Logger;11 12 /**13  *14  * @author Administrator15 */16 public class TestClient {17 18   static final Logger log = Logger.getLogger(TestClient.class);19   static NettyTcpClient client = null;20 21   public static void main(String[] args) {22     client = new NettyTcpClient("127.0.0.1", 9527, true, new NettyMessageHandler() {23 24       @Override25       public void channelActive(ChannelHandlerContext session) {26         log.info("连接服务器成功:");27         //构建错误的登录消息28         TestMessage.ReqLoginMessage.Builder newBuilder = TestMessage.ReqLoginMessage.newBuilder();29         newBuilder.setUserName("a");30         newBuilder.setUserPwd("a");31         //发送消息32         TestClient.client.sendMsg(new NettyMessageBean(TestMessage.Proto_Login.ReqLogin_VALUE, newBuilder.build().toByteArray()));33 34         //构建正确的登录消息35         TestMessage.ReqLoginMessage.Builder newBuilder1 = TestMessage.ReqLoginMessage.newBuilder();36         newBuilder1.setUserName("admin");37         newBuilder1.setUserPwd("admin");38         TestClient.client.sendMsg(new NettyMessageBean(TestMessage.Proto_Login.ReqLogin_VALUE, newBuilder1.build().toByteArray()));39       }40 41       @Override42       public void readMessage(NettyMessageBean msg) {43         try {44           if (msg.getMsgid() == TestMessage.Proto_Login.ResTip_VALUE) {45             TestMessage.ResTipMessage tipmessage = TestMessage.ResTipMessage.parseFrom(msg.getMsgbuffer());46             log.info("收到提示信息:" + tipmessage.getMsg());47           } else if (msg.getMsgid() == TestMessage.Proto_Login.ResChat_VALUE) {48             TestMessage.ResChatMessage tipmessage = TestMessage.ResChatMessage.parseFrom(msg.getMsgbuffer());49             log.info("收到聊天消息:" + tipmessage.getMsg());50           }51         } catch (InvalidProtocolBufferException ex) {52           log.error("收到消息:" + msg.getMsgid() + " 解析出错:" + ex);53         }54       }55 56       @Override57       public void closeSession(ChannelHandlerContext session) {58         log.info("连接关闭或者连接不成功:");59       }60 61       @Override62       public void exceptionCaught(ChannelHandlerContext session, Throwable cause) {63         log.info("错误:" + cause.toString());64       }65     });66     client.Connect();67 68     BufferedReader strin = new BufferedReader(new InputStreamReader(System.in));69     while (true) {70       try {71         String str = strin.readLine();72         //构建聊天消息73         TestMessage.ReqChatMessage.Builder chatmessage = TestMessage.ReqChatMessage.newBuilder();74         chatmessage.setMsg(str);75         TestClient.client.sendMsg(new NettyMessageBean(TestMessage.Proto_Login.ReqChat_VALUE, chatmessage.build().toByteArray()));76       } catch (IOException ex) {77       }78     }79 80   }81 82 }

 

接下来我们看看效果

 

我设置了断线重连功能,我们来测试一下,把服务器关闭

 

可以看到没3秒向服务器发起一次请求;

知道服务器再次开启链接成功

完整的通信示例演示就完了;

代码我不在上传了,请各位使用svn下载好么????

需要注意的是,消息的解码器和编码器,一定要双方都遵守你自己的契约。比如我在编码消息格式的时候先写入消息包的长度,然后跟上消息的id,再是消息的内容

所以解码的时候,先读取一个消息长度,在读取一个消息id,如果本次收到的消息字节数不够长度那么留存起来以用于下一次收到字节数组追加后再一起解析。

这样就能解决粘包的问题。

附上C#版本的解析器

 

 1 using System; 2 using System.Collections.Generic; 3 using System.IO; 4 using System.Linq; 5 using System.Text; 6 using System.Threading.Tasks; 7  8 /** 9  *  10  * @author 失足程序员 11  * @Blog http://www.cnblogs.com/ty408/ 12  * @mail 492794628@qq.com 13  * @phone 13882122019 14  *  15 */ 16 namespace Sz.Network.SocketPool 17 { 18   public class MarshalEndian : IMarshalEndian 19   { 20  21     public enum JavaOrNet 22     { 23       Java, 24       Net, 25     } 26  27     public MarshalEndian() 28     { 29  30     } 31  32     public static JavaOrNet JN = JavaOrNet.Net; 33  34     /// <summary> 35     /// 读取大端序的int 36     /// </summary> 37     /// <param name="value"></param> 38     public int ReadInt(byte[] intbytes) 39     { 40       Array.Reverse(intbytes); 41       return BitConverter.ToInt32(intbytes, 0); 42     } 43  44     /// <summary> 45     /// 写入大端序的int 46     /// </summary> 47     /// <param name="value"></param> 48     public byte[] WriterInt(int value) 49     { 50       byte[] bs = BitConverter.GetBytes(value); 51       Array.Reverse(bs); 52       return bs; 53     } 54  55     //用于存储剩余未解析的字节数 56     private List<byte> _LBuff = new List<byte>(2); 57  58     //字节数常量一个消息id4个字节 59     const long ConstLenght = 4L; 60  61     public void Dispose() 62     { 63       this.Dispose(true); 64       GC.SuppressFinalize(this); 65     } 66  67     protected virtual void Dispose(bool flag1) 68     { 69       if (flag1) 70       { 71         IDisposable disposable = this._LBuff as IDisposable; 72         if (disposable != null) { disposable.Dispose(); } 73       } 74     } 75  76     public byte[] Encoder(SocketMessage msg) 77     { 78       MemoryStream ms = new MemoryStream(); 79       BinaryWriter bw = new BinaryWriter(ms, UTF8Encoding.Default); 80       byte[] msgBuffer = msg.MsgBuffer; 81  82       if (msgBuffer != null) 83       { 84         switch (JN) 85         { 86           case JavaOrNet.Java: 87             bw.Write(WriterInt(msgBuffer.Length + 4)); 88             bw.Write(WriterInt(msg.MsgID)); 89             break; 90           case JavaOrNet.Net: 91             bw.Write((Int32)(msgBuffer.Length + 4)); 92             bw.Write(msg.MsgID); 93             break; 94         } 95  96         bw.Write(msgBuffer); 97       } 98       else 99       {100         switch (JN)101         {102           case JavaOrNet.Java:103             bw.Write(WriterInt(0));104             break;105           case JavaOrNet.Net:106             bw.Write((Int32)0);107             break;108         }109       }110       bw.Close();111       ms.Close();112       bw.Dispose();113       ms.Dispose();114       return ms.ToArray();115     }116 117     public List<SocketMessage> Decoder(byte[] buff, int len)118     {119       //拷贝本次的有效字节120       byte[] _b = new byte[len];121       Array.Copy(buff, 0, _b, 0, _b.Length);122       buff = _b;123       if (this._LBuff.Count > 0)124       {125         //拷贝之前遗留的字节126         this._LBuff.AddRange(_b);127         buff = this._LBuff.ToArray();128         this._LBuff.Clear();129         this._LBuff = new List<byte>(2);130       }131       List<SocketMessage> list = new List<SocketMessage>();132       MemoryStream ms = new MemoryStream(buff);133       BinaryReader buffers = new BinaryReader(ms, UTF8Encoding.Default);134       try135       {136         byte[] _buff;137       Label_0073:138         //判断本次解析的字节是否满足常量字节数 139         if ((buffers.BaseStream.Length - buffers.BaseStream.Position) < ConstLenght)140         {141           _buff = buffers.ReadBytes((int)(buffers.BaseStream.Length - buffers.BaseStream.Position));142           this._LBuff.AddRange(_buff);143         }144         else145         {146           long offset = 0;147           switch (JN)148           {149             case JavaOrNet.Java:150               offset = ReadInt(buffers.ReadBytes(4));151               break;152             case JavaOrNet.Net:153               offset = buffers.ReadInt32();154               break;155           }156 157           //剩余字节数大于本次需要读取的字节数158           if (offset <= (buffers.BaseStream.Length - buffers.BaseStream.Position))159           {160             int msgID = 0;161             switch (JN)162             {163               case JavaOrNet.Java:164                 msgID = ReadInt(buffers.ReadBytes(4));165                 break;166               case JavaOrNet.Net:167                 msgID = buffers.ReadInt32();168                 break;169             }170             _buff = buffers.ReadBytes((int)(offset - 4));171             list.Add(new SocketMessage(msgID, _buff));172             goto Label_0073;173           }174           else175           {176             //剩余字节数刚好小于本次读取的字节数 存起来,等待接受剩余字节数一起解析177             buffers.BaseStream.Seek(ConstLenght, SeekOrigin.Current);178             _buff = buffers.ReadBytes((int)(buffers.BaseStream.Length - buffers.BaseStream.Position));179             this._LBuff.AddRange(_buff);180           }181         }182       }183       catch { }184       finally185       {186         buffers.Close();187         if (buffers != null) { buffers.Dispose(); }188         ms.Close();189         if (ms != null) { ms.Dispose(); }190       }191       return list;192     }193   }194 }

谢谢观赏~!