欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

高性能TcpServer - 3.命令通道(处理:掉包,粘包,垃圾包)

程序员文章站 2022-07-02 13:26:47
高性能TcpServer - 1.网络通信协议 高性能TcpServer - 2.创建高性能Socket服务器SocketAsyncEventArgs的实现(IOCP) 高性能TcpServer - 3.命令通道(处理:掉包,粘包,垃圾包) 高性能TcpServer - 4.文件通道(处理:文件分包 ......

高性能tcpserver - 1.网络通信协议

高性能tcpserver - 2.创建高性能socket服务器socketasynceventargs的实现(iocp)

高性能tcpserver - 3.命令通道(处理:掉包,粘包,垃圾包)

高性能tcpserver - 4.文件通道(处理:文件分包,支持断点续传)

高性能tcpserver - 5.客户端管理

高性能tcpserver - 6.代码下载

 

处理原理

每个client创建各自的byte[]数组,通过遍历每个字节的数据

1.判断包长,确定掉包;

2.判断解析完后byte数组是否还有未解析的数据,确定粘包;

3.判断包头,确定垃圾包;

 

缓存数据类

 /// <summary>

    /// 缓存数据类

    /// </summary>

    public class cbytebuffer

    {

        // 默认1k

        int m_ibuffersize = 1024 * 1;

 

        // 数据解析

        byte[] m_abybuf;

        int m_iposition = 0;

        int m_irecvlength = 0;

        bool bwaitrecvremain;// 数据未接收完等待接收

        object m_lock = new object(); // 内部同步锁

 

        public int position

        {

            get { return m_iposition; }

            set { m_iposition = value; }

        }

 

        public int recvlength

        {

            get { return m_irecvlength; }

            set { m_irecvlength = value; }

        }

 

        public bool waitrecvremain

        {

            get { return bwaitrecvremain; }

            set { bwaitrecvremain = value; }

        }

 

        public cbytebuffer(int buffsize)

        {

            m_ibuffersize = buffsize;

            m_abybuf = new byte[m_ibuffersize];

        }

 

        public int getposition()

        {

            return m_iposition;

        }

 

        public int getrecvlength()

        {

            return m_irecvlength;

        }

 

        public void put(socketasynceventargs e)

        {

            int ilength = e.bytestransferred;

            if (m_irecvlength + ilength >= m_ibuffersize)

            {

                clear();

                return;

            }

 

            lock (m_lock)

            {

                array.copy(e.buffer, e.offset, m_abybuf, m_irecvlength, ilength);

                m_irecvlength += ilength;

            }

        }

 

        public byte getbyte()

        {

            bwaitrecvremain = false;

 

            if (m_iposition >= m_irecvlength)

            {

                bwaitrecvremain = true;

                return 0;

            }

 

            byte byret;

            lock (m_lock)

            {

                byret = m_abybuf[m_iposition];

            }

            m_iposition++;

 

            return byret;

        }

 

        public byte[] getbytearray(int length)

        {

            bwaitrecvremain = false;

 

            if (m_iposition + length > m_irecvlength)

            {

                bwaitrecvremain = true;

                return null;

            }

 

            byte[] ret = new byte[length];

 

            lock (m_lock)

            {

                array.copy(m_abybuf, m_iposition, ret, 0, length);

 

                m_iposition += length;

            }

 

            return ret;

        }

 

        public bool hasremaining()

        {

            return m_iposition < m_irecvlength;

        }

 

        public int remaining()

        {

            return m_irecvlength - m_iposition;

        }

 

        public void clear()

        {

            m_iposition = 0;

            m_irecvlength = 0;

            bwaitrecvremain = false;

        }

 

        ~cbytebuffer()

        {

            m_abybuf = null;

            dispose(false);

        }

 

        protected virtual void dispose(bool disposing)

        {

            if (disposing)

            {

                gc.suppressfinalize(this);

            }

        }

 

        public void dispose()

        {

            dispose(true);

        }

    }

协议解析类

        public void process(cbytebuffer bbuffer, cprotocolanalysis analysis, string sn)

        {

            analysis.bagstatus = cprotocolanalysis.ebagstatus.bagnone;

            analysis.whethertosend = false;

 

            int iposition = bbuffer.position;

            byte head1 = 0; byte head2 = 0; byte head3 = 0; byte head4 = 0; byte head5 = 0; byte head6 = 0; bool headok = false;

 

            if (!bbuffer.hasremaining()) return;

 

            while (bbuffer.hasremaining())

            {

                head1 = bbuffer.getbyte(); if (!analysis.isremaindata(iposition, bbuffer, analysis)) return;

                if (head1 == head1)

                {

                    iposition = bbuffer.position - 1;

                    head2 = bbuffer.getbyte(); if (!analysis.isremaindata(iposition, bbuffer, analysis)) return;

                    head3 = bbuffer.getbyte(); if (!analysis.isremaindata(iposition, bbuffer, analysis)) return;

                    head4 = bbuffer.getbyte(); if (!analysis.isremaindata(iposition, bbuffer, analysis)) return;

                    head5 = bbuffer.getbyte(); if (!analysis.isremaindata(iposition, bbuffer, analysis)) return;

                    head6 = bbuffer.getbyte(); if (!analysis.isremaindata(iposition, bbuffer, analysis)) return;

                    if (head2 == head2 && head3 == head3 && head4 == head4 && head5 == head5 && head6 == head6)

                    {

                        headok = true;

                        break;

                    }

                    else

                    {

                        cloghelp.appendlog("error,unable to parse the data2:position=" + iposition.tostring() + ",index=" + (bbuffer.getposition()).tostring() + ",head2=" + head2.tostring());

                    }

                }

                else

                {

                    cloghelp.appendlog("error,unable to parse the data1:position=" + iposition.tostring() + ",index=" + (bbuffer.getposition()).tostring() + ",head1=" + head1.tostring());

                }

            }

 

            if (!bbuffer.hasremaining())

            {

                if (headok)

                {

                    if (!analysis.isremaindata(iposition, bbuffer, analysis)) return;

                }

                return;

            }

 

            byte[] arrlen = bbuffer.getbytearray(4); if (!analysis.isremaindata(iposition, bbuffer, analysis)) return;

            int len = ccommonfunc.string2int(ccommonfunc.bytetostring(arrlen)); if (-1 == len) return;

            byte[] source = bbuffer.getbytearray(len); if (!analysis.isremaindata(iposition, bbuffer, analysis)) return;

 

            if (!bbuffer.hasremaining())

            {

                bbuffer.clear();

            }

            else

            {

                analysis.bagstatus = cprotocolanalysis.ebagstatus.bagstick;

            }

 

            // #watermeter-001#01##

            string data = ccommonfunc.bytetostring(source);

            if (null == data || 0 == data.length || data.length - 1 != data.lastindexof(split1))

            {

                return;

            }

            data = data.substring(1, data.length - 2);

            string[] item = data.split(split1);

            if (null == item || 4 != item.length)

            {

                return;

            }

            string uid = item[0];

            string taskid = item[1];

            int cmd = ccommonfunc.string2int(item[2]);

            string content = item[3];

            program.addmessage("r: [" + sn + "] cmd=" + cmd.tostring() + " data=" + data);

 

            analysis.cmd = cmd;

            analysis.uid = uid;

            analysis.taskid = taskid;

 

            if (cmd == 1 || cmd == 2 || cmd == 3 || cmd == 4 || cmd == 5 || cmd == 6 || cmd == 7)

            {

                analysis.whethertosend = true;

            }

 

            string softtype = "";  

 

            try

            {

                switch (cmd)

                {

                    case 1:

                        analysis.msg = "ok";

                        break;

                    case 2:

                        analysis.msg = datetime.now.tostring("yyyy-mm-dd hh:mm:ss");

                        break;

                    case 3:

                        // htemp=0263#watermeter-001#1520557004#03#buildid=44@edmid=37@meterid=1228@senddate=2018-02-05 17:36:22@[{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}]#

                        analysis.msg = "ok";

                        break;

                    case 4:

                        {

                            // 获取版本信息

                            softtype = content.split(split2)[1];

                            storagefile(softtype, system.windows.forms.application.startuppath + "\\test.zip");

                            analysis.msg = "2";// version

                        }

                        break;

                    case 5:

                        // 获取包数

                        {

                            softtype = content.split(split2)[1];

                            if (!dicsoft.containskey(softtype))

                            {

                                storagefile(softtype, system.windows.forms.application.startuppath + "\\test.zip");

                            }

                            // 获取包数

                            int count = 0;

                            filecut entity = null;

                            dicsoft.trygetvalue(softtype, out entity);

                            if (null != entity) count = entity.count;

                            analysis.msg = count.tostring();

                        }

                        break;

                    case 6:

                        // 执行更新动作

                        {

                            string[] items = content.split(split2);

                            softtype = items[1];

                            int downindex = ccommonfunc.string2int(items[2]);

                            if (!dicsoft.containskey(softtype))

                            {

                                analysis.msg = "error@" + softtype + " 未找到更新文件,请先获取包数";

                            }

                            else

                            {

                                filecut entity = null;

                                dicsoft.trygetvalue(softtype, out entity);

                                if (null != entity)

                                {

                                    string filedata = "";

                                    entity.data.trygetvalue(downindex, out filedata);

                                    if (string.isnullorempty(filedata))

                                        analysis.msg = "error@" + softtype + " 第" + downindex + "包的数据为空";

                                    else

                                        analysis.msg = filedata;

                                }

                            }

                        }

                        break;

                    case 7:

                        // 更新版本信息(update sql)

                        analysis.msg = "ok";

                        break;

                }

            }

            catch (exception ex)

            {

                analysis.msg = "error@" + ex.message;

            }

            program.addmessage("s: [" + sn + "] cmd=" + cmd.tostring() + " data=" + analysis.msg);

        }

测试效果

正常包

htemp=0026#meter-001#1533022506#01##

 高性能TcpServer - 3.命令通道(处理:掉包,粘包,垃圾包)

 

掉包(分两包发送)

htemp=0026#

meter-001#1533022506#01##

高性能TcpServer - 3.命令通道(处理:掉包,粘包,垃圾包)

 

 

粘包(两包一起发送)

htemp=0026#meter-001#1533022506#01##htemp=0026#meter-001#1533022506#01##

 高性能TcpServer - 3.命令通道(处理:掉包,粘包,垃圾包)