你的位置:首页 > ASP.net教程

[ASP.net教程]C# Socket系列三 socket通信的封包和拆包


通过系列二 我们已经实现了socket的简单通信 接下来我们测试一下,在时间应用的场景下,我们会快速且大量的传输数据的情况!

 1  class Program 2   { 3     static void Main(string[] args) 4     { 5       TCPListener tcp = new TCPListener(); 6       TSocketClient client = new TSocketClient(); 7       for (int i = 0; i < 10; i++) 8       { 9         client.SendMsg(System.Text.UTF8Encoding.Default.GetBytes("Holle Server!"));10       }11       Console.ReadLine();12     }13   }

我们通过测试代码快速发送10条消息到服务器去,

我们看看运行结果

这样不难看出,我们的客户端发送了10条消息,但是服务器收到的时候变成了两条消息,回复客户端自然就变成两次回复。

这是为什么呢?

我们修改一下程序一秒钟发送一次消息试试

 1  class Program 2   { 3     static void Main(string[] args) 4     { 5       TCPListener tcp = new TCPListener(); 6       TSocketClient client = new TSocketClient(); 7       for (int i = 0; i < 5; i++) 8       { 9         Thread.Sleep(1000);10         client.SendMsg(System.Text.UTF8Encoding.Default.GetBytes("Holle Server!"));11       }12       Console.ReadLine();13     }14   }

运行看看,

这次对了那么分析分析到底为什么呢?

这是socket的底层,做的手脚。因为我设置socket的发送和接受缓冲区

//10K的缓冲区空间
private int BufferSize = 10 * 1024;

10k的缓冲区,

且socket的底层 发送消息会有一定间隙,虽然这个时间很短,但是我们直接for循环发送的话,时间同意很快,

因为socket.send()方法并非真实的发送数据而是把数据压入发送缓冲区。

那么我们就明白了为什么会出现上面的情况

出现了这样的情况我们要怎么解决呢?

时间应用场景不可能1秒钟才一条消息啥。

我们知道了导致这个问题的原因是因为消息发送是出现了快速压入很多发送消息到待发送缓冲区里面一起发送导致的。这样情况就是粘包了,那么我们是不是可以考虑给每一个消息加入包标识呢?

接下来我们修改一下发送包的数据代码

创建消息的构造体 TSocketMessage

 1 /// <summary> 2   /// 底层通信消息 3   /// </summary> 4   public class TSocketMessage : IDisposable 5   { 6     /// <summary> 7     /// 消息ID 8     /// </summary> 9     public int MsgID;10     /// <summary>11     /// 消息内容12     /// </summary>13     public byte[] MsgBuffer;14 15     public TSocketMessage(int msgID, byte[] msg)16     {17       this.MsgID = msgID;18       this.MsgBuffer = msg;19     }20 21     public void Dispose()22     {23       this.Dispose(true);24       GC.SuppressFinalize(this);25     }26 27     protected virtual void Dispose(bool flag1)28     {29       if (flag1) { this.MsgBuffer = null; }30     }31   }

 

接下来我们创建消息包的封装和拆分 MarshalEndian

 1 public class MarshalEndian 2   { 3     //用于存储剩余未解析的字节数 4     private List<byte> _LBuff = new List<byte>(2); 5     //默认是utf8的编码格式 6     private UTF8Encoding utf8 = new UTF8Encoding(); 7  8     //包头1 9     const Int16 t1 = 0x55; 10     //包头2 11     const Int16 t2 = 0xAA; 12     //字节数常量 两个包头4个字节,一个消息id4个字节,封装消息长度 long 8个字节 13     const long ConstLenght = 12L; 14  15     public void Dispose() 16     { 17       this.Dispose(true); 18       GC.SuppressFinalize(this); 19     } 20  21     protected virtual void Dispose(bool flag1) 22     { 23       if (flag1) 24       { 25         IDisposable disposable2 = this.utf8 as IDisposable; 26         if (disposable2 != null) { disposable2.Dispose(); } 27         IDisposable disposable = this._LBuff as IDisposable; 28         if (disposable != null) { disposable.Dispose(); } 29       } 30     } 31  32     public byte[] Encode(TSocketMessage msg) 33     { 34       MemoryStream ms = new MemoryStream(); 35       BinaryWriter bw = new BinaryWriter(ms, new UTF8Encoding()); 36       byte[] msgBuffer = msg.MsgBuffer; 37  38       #region 封装包头 39       bw.Write((Int16)t1); 40       bw.Write((Int16)t2); 41       #endregion 42  43       #region 包协议 44       if (msgBuffer != null) 45       { 46         bw.Write((Int64)(msgBuffer.Length + 4)); 47         bw.Write(msg.MsgID); 48         bw.Write(msgBuffer); 49       } 50       else { bw.Write((Int64)0); } 51       #endregion 52  53       bw.Close(); 54       ms.Close(); 55       bw.Dispose(); 56       ms.Dispose(); 57       return ms.ToArray(); 58     } 59  60     public List<TSocketMessage> GetDcAppMess(byte[] buff, int len) 61     { 62       //拷贝本次的有效字节 63       byte[] _b = new byte[len]; 64       Array.Copy(buff, 0, _b, 0, _b.Length); 65       buff = _b; 66       if (this._LBuff.Count > 0) 67       { 68         //拷贝之前遗留的字节 69         this._LBuff.AddRange(_b); 70         buff = this._LBuff.ToArray(); 71         this._LBuff.Clear(); 72         this._LBuff = new List<byte>(2); 73       } 74  75       List<TSocketMessage> list = new List<TSocketMessage>(); 76       MemoryStream ms = new MemoryStream(buff); 77       BinaryReader buffers = new BinaryReader(ms, this.utf8); 78       try 79       { 80         byte[] _buff; 81       Label_0073: 82         //判断本次解析的字节是否满足常量字节数  83         if ((buffers.BaseStream.Length - buffers.BaseStream.Position) < ConstLenght) 84         { 85           _buff = new byte[(int)(buffers.BaseStream.Length - buffers.BaseStream.Position)]; 86           Array.Copy(buff, (int)buffers.BaseStream.Position, _buff, 0, _buff.Length); 87           this._LBuff.AddRange(_buff); 88           return list; 89         } 90         #region 包头读取 91       //循环读取包头 92       Label_00983: 93         Int16 tt1 = buffers.ReadInt16(); 94         Int16 tt2 = buffers.ReadInt16(); 95         if (!(tt1 == t1 && tt2 == t2)) 96         { 97           long ttttt = buffers.BaseStream.Seek(-3, SeekOrigin.Current); 98           goto Label_00983; 99         }100         #endregion101 102         #region 包协议103         long offset = buffers.ReadInt64();104         #endregion105 106         #region 包解析107         //剩余字节数大于本次需要读取的字节数108         if (offset < (buffers.BaseStream.Length - buffers.BaseStream.Position))109         {110           int msgID = buffers.ReadInt32();111           _buff = new byte[offset - 4];112           Array.Copy(buff, (int)buffers.BaseStream.Position, _buff, 0, _buff.Length);113           list.Add(new TSocketMessage(msgID, _buff));114           //设置偏移量 然后继续循环读取115           buffers.BaseStream.Seek(offset, SeekOrigin.Current);116           goto Label_0073;117         }118         else if (offset == (buffers.BaseStream.Length - buffers.BaseStream.Position))119         {120           int msgID = buffers.ReadInt32();121           //剩余字节数刚好等于本次读取的字节数122           _buff = new byte[offset - 4];123           Array.Copy(buff, (int)buffers.BaseStream.Position, _buff, 0, _buff.Length);124           list.Add(new TSocketMessage(msgID, _buff));125         }126         else127         {128           //剩余字节数刚好小于本次读取的字节数 存起来,等待接受剩余字节数一起解析129           _buff = new byte[(int)(buffers.BaseStream.Length - buffers.BaseStream.Position + ConstLenght)];130           Array.Copy(buff, (int)(buffers.BaseStream.Position - ConstLenght), _buff, 0, _buff.Length);131           buff = _buff;132           this._LBuff.AddRange(_buff);133         }134         #endregion135 136       }137       catch { }138       finally139       {140         if (buffers != null) { buffers.Dispose(); }141         buffers.Close();142         if (buffers != null) { buffers.Dispose(); }143         ms.Close();144         if (ms != null) { ms.Dispose(); }145       }146       return list;147     }148   }

 

接下来我们修改一下 TSocketBase 的 抽象方法  

1 public abstract void Receive(TSocketMessage msg);

 

在修改接受消息回调函数

 1  /// <summary> 2     /// 消息解析器 3     /// </summary> 4     MarshalEndian mersha = new MarshalEndian(); 5  6     /// <summary> 7     /// 接收消息回调函数 8     /// </summary> 9     /// <param name="iar"></param>10     private void ReceiveCallback(IAsyncResult iar)11     {12       if (!this.IsDispose)13       {14         try15         {16           //接受消息17           ReceiveSize = _Socket.EndReceive(iar, out ReceiveError);18           //检查状态码19           if (!CheckSocketError(ReceiveError) && SocketError.Success == ReceiveError)20           {21             //判断接受的字节数22             if (ReceiveSize > 0)23             {24               byte[] rbuff = new byte[ReceiveSize];25               Array.Copy(this.Buffers, rbuff, ReceiveSize);26               var msgs = mersha.GetDcAppMess(rbuff, ReceiveSize);27               foreach (var msg in msgs)28               {29                 this.Receive(msg);30               }31               //重置连续收到空字节数32               ZeroCount = 0;33               //继续开始异步接受消息34               ReceiveAsync();35             }36             else37             {38               ZeroCount++;39               if (ZeroCount == 5) { this.Close("错误链接"); }40             }41           }42         }43         catch (System.Net.Sockets.SocketException) { this.Close("链接已经被关闭"); }44         catch (System.ObjectDisposedException) { this.Close("链接已经被关闭"); }45       }46     }

这样我们完成了在收到消息后对数据包的解析。

修改一下TSocketClient的 Receive 重写方法

 1     /// <summary> 2     /// 收到消息后 3     /// </summary> 4     /// <param name="rbuff"></param> 5     public override void Receive(TSocketMessage msg) 6     { 7       Console.WriteLine("Receive ID:" + msg.MsgID + " Msg:" + System.Text.UTF8Encoding.Default.GetString(msg.MsgBuffer)); 8       if (isServer) 9       {10         this.SendMsg(new TSocketMessage(msg.MsgID, System.Text.UTF8Encoding.Default.GetBytes("Holle Client!")));11       }12     }

修改测试代码如下

 1   class Program 2   { 3     static void Main(string[] args) 4     { 5       TCPListener tcp = new TCPListener(); 6       TSocketClient client = new TSocketClient(); 7       for (int i = 1; i < 5; i++) 8       { 9         Thread.Sleep(1000);10         client.SendMsg(new TSocketMessage(i, System.Text.UTF8Encoding.Default.GetBytes("Holle Server!")));11       }12       Console.ReadLine();13     }14   }

运行结果

 

接受成功了,那么我们取消暂停状态,快速发送消息试试

 1   class Program 2   { 3     static void Main(string[] args) 4     { 5       TCPListener tcp = new TCPListener(); 6       TSocketClient client = new TSocketClient(); 7       for (int i = 1; i < 5; i++) 8       { 9         client.SendMsg(new TSocketMessage(i, System.Text.UTF8Encoding.Default.GetBytes("Holle Server!")));10       }11       Console.ReadLine();12     }13   }

看看运行结果

 

瞬间完成了消息发送,也没有再出现第一次运行的那样~!

这样完美的解决了socket通信 在传输上发送粘包问题

 

下载程序完整代码