欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

C#使用Protocol Buffer(ProtoBuf)进行Unity中的Socket通信

程序员文章站 2023-10-31 16:30:46
首先来说一下本文中例子所要实现的功能: 基于protobuf序列化对象 使用socket实现时时通信 数据包的编码和解码 下面来看具体的步骤:...

首先来说一下本文中例子所要实现的功能:

  • 基于protobuf序列化对象
  • 使用socket实现时时通信
  • 数据包的编码和解码

下面来看具体的步骤:

一、unity中使用protobuf

导入dll到unity中,
创建网络传输的模型类:

using system;
using protobuf;

//添加特性,表示可以被protobuf工具序列化
[protocontract]
public class netmodel {
 //添加特性,表示该字段可以被序列化,1可以理解为下标
 [protomember(1)] 
 public int id;
 [protomember(2)]
 public string commit;
 [protomember(3)]
 public string message;
}

using system;
using protobuf;
 
//添加特性,表示可以被protobuf工具序列化
[protocontract]
public class netmodel {
 //添加特性,表示该字段可以被序列化,1可以理解为下标
 [protomember(1)] 
 public int id;
 [protomember(2)]
 public string commit;
 [protomember(3)]
 public string message;
}

在unity中添加测试脚本,介绍protobuf工具的使用。

using system;
using system.io;

public class test : monobehaviour {

 void start () {
  //创建对象
  netmodel item = new netmodel(){id = 1, commit = "lanou", message = "unity"};
  //序列化对象
  byte[] temp = serialize(item);
  //protobuf的优势一:小
  debug.log(temp.length);
  //反序列化为对象
  netmodel result = deserialize(temp);
  debug.log(result.message);

 }

 // 将消息序列化为二进制的方法
 // < param name="model">要序列化的对象< /param>
 private byte[] serialize(netmodel model)
 {
  try {
   //涉及格式转换,需要用到流,将二进制序列化到流中
   using (memorystream ms = new memorystream()) {
    //使用protobuf工具的序列化方法
    protobuf.serializer.serialize<netmodel> (ms, model);
    //定义二级制数组,保存序列化后的结果
    byte[] result = new byte[ms.length];
    //将流的位置设为0,起始点
    ms.position = 0;
    //将流中的内容读取到二进制数组中
    ms.read (result, 0, result.length);
    return result;
   }
  } catch (exception ex) {
   debug.log ("序列化失败: " + ex.tostring());
   return null;
  }
 }

 // 将收到的消息反序列化成对象
 // < returns>the serialize.< /returns>
 // < param name="msg">收到的消息.</param>
 private netmodel deserialize(byte[] msg)
 {
  try {
   using (memorystream ms = new memorystream()) {
    //将消息写入流中
    ms.write (msg, 0, msg.length);
    //将流的位置归0
    ms.position = 0;
    //使用工具反序列化对象
    netmodel result = protobuf.serializer.deserialize<netmodel> (ms);
    return result;
   }
  } catch (exception ex) {  
    debug.log("反序列化失败: " + ex.tostring());
    return null;
  }
 }
}

using system;
using system.io;
 
public class test : monobehaviour {
 
 void start () {
  //创建对象
  netmodel item = new netmodel(){id = 1, commit = "lanou", message = "unity"};
  //序列化对象
  byte[] temp = serialize(item);
  //protobuf的优势一:小
  debug.log(temp.length);
  //反序列化为对象
  netmodel result = deserialize(temp);
  debug.log(result.message);
 
 }
 
 // 将消息序列化为二进制的方法
 // < param name="model">要序列化的对象< /param>
 private byte[] serialize(netmodel model)
 {
  try {
   //涉及格式转换,需要用到流,将二进制序列化到流中
   using (memorystream ms = new memorystream()) {
    //使用protobuf工具的序列化方法
    protobuf.serializer.serialize<netmodel> (ms, model);
    //定义二级制数组,保存序列化后的结果
    byte[] result = new byte[ms.length];
    //将流的位置设为0,起始点
    ms.position = 0;
    //将流中的内容读取到二进制数组中
    ms.read (result, 0, result.length);
    return result;
   }
  } catch (exception ex) {
   debug.log ("序列化失败: " + ex.tostring());
   return null;
  }
 }
 
 // 将收到的消息反序列化成对象
 // < returns>the serialize.< /returns>
 // < param name="msg">收到的消息.</param>
 private netmodel deserialize(byte[] msg)
 {
  try {
   using (memorystream ms = new memorystream()) {
    //将消息写入流中
    ms.write (msg, 0, msg.length);
    //将流的位置归0
    ms.position = 0;
    //使用工具反序列化对象
    netmodel result = protobuf.serializer.deserialize<netmodel> (ms);
    return result;
   }
  } catch (exception ex) {  
    debug.log("反序列化失败: " + ex.tostring());
    return null;
  }
 }
}

二、unity中使用socket实现时时通信

通信应该实现的功能:

  • 服务器可以时时监听多个客户端
  • 服务器可以时时监听某一个客户端消息
  • 服务器可以时时给某一个客户端发消息
  • 首先我们需要定义一个客户端对象
using system;
using system.net.sockets;

// 表示一个客户端
public class netusertoken {
 //连接客户端的socket
 public socket socket;
 //用于存放接收数据
 public byte[] buffer;

 public netusertoken()
 {
  buffer = new byte[1024];
 }

 // 接受消息
 // < param name="data">data.< /param>
 public void receive(byte[] data)
 {
  unityengine.debug.log("接收到消息!");
 }

 // 发送消息
 //< param name="data">data.< /param>
 public void send(byte[] data)
 {  

 }
}

using system;
using system.net.sockets;
 
// 表示一个客户端
public class netusertoken {
 //连接客户端的socket
 public socket socket;
 //用于存放接收数据
 public byte[] buffer;
 
 public netusertoken()
 {
  buffer = new byte[1024];
 }
 
 // 接受消息
 // < param name="data">data.< /param>
 public void receive(byte[] data)
 {
  unityengine.debug.log("接收到消息!");
 }
 
 // 发送消息
 //< param name="data">data.< /param>
 public void send(byte[] data)
 {  
 
 }
}


然后实现我们的服务器代码

using system.collections;
using system.collections.generic;
using system.net;
using system;
using system.net.sockets;

public class netserver{
 //单例脚本
 public static readonly netserver instance = new netserver();
 //定义tcp服务器
 private socket server;
 private int maxclient = 10;
 //定义端口
 private int port = 35353;
 //用户池
 private stack<netusertoken> pools;
 private netserver()
 {
  //初始化socket
  server = new socket(addressfamily.internetwork, sockettype.stream, protocoltype.tcp);
  server.bind(new ipendpoint(ipaddress.any, port));

 }

 //开启服务器
 public void start()
 {
  server.listen(maxclient);
  unityengine.debug.log("server ok!");
  //实例化客户端的用户池
  pools = new stack<netusertoken>(maxclient);
  for(int i = 0; i < maxclient; i++)
  {
   netusertoken usertoken = new netusertoken();
   pools.push(usertoken);
  }
  //可以异步接受客户端, beginaccept函数的第一个参数是回调函数,当有客户端连接的时候自动调用
  server.beginaccept (asyncaccept, null);
 }

 //回调函数, 有客户端连接的时候会自动调用此方法
 private void asyncaccept(iasyncresult result)
 {
  try {
   //结束监听,同时获取到客户端
   socket client = server.endaccept(result);
   unityengine.debug.log("有客户端连接");
   //来了一个客户端
   netusertoken usertoken = pools.pop();
   usertoken.socket = client;
   //客户端连接之后,可以接受客户端消息
   beginreceive(usertoken);

   //尾递归,再次监听是否还有其他客户端连入
   server.beginaccept(asyncaccept, null);
  } catch (exception ex) {
   unityengine.debug.log(ex.tostring());
  }
 }

 //异步监听消息
 private void beginreceive(netusertoken usertoken)
 {
  try {
   //异步方法
   usertoken.socket.beginreceive(usertoken.buffer, 0, usertoken.buffer.length, socketflags.none,
           endreceive, usertoken);
  } catch (exception ex) {
   unityengine.debug.log(ex.tostring());
  }
 }

 //监听到消息之后调用的函数
 private void endreceive(iasyncresult result)
 {
  try {
   //取出客户端
   netusertoken usertoken = result.asyncstate as netusertoken;
   //获取消息的长度
   int len = usertoken.socket.endreceive(result);
   if(len > 0)
   { 
    byte[] data = new byte[len];
    buffer.blockcopy(usertoken.buffer, 0, data, 0, len);
    //用户接受消息
    usertoken.receive(data);
    //尾递归,再次监听客户端消息
    beginreceive(usertoken);
   }

  } catch (exception ex) {
   unityengine.debug.log(ex.tostring());
  }
 }
}

using system.collections;
using system.collections.generic;
using system.net;
using system;
using system.net.sockets;
 
public class netserver{
 //单例脚本
 public static readonly netserver instance = new netserver();
 //定义tcp服务器
 private socket server;
 private int maxclient = 10;
 //定义端口
 private int port = 35353;
 //用户池
 private stack<netusertoken> pools;
 private netserver()
 {
  //初始化socket
  server = new socket(addressfamily.internetwork, sockettype.stream, protocoltype.tcp);
  server.bind(new ipendpoint(ipaddress.any, port));
 
 }
 
 //开启服务器
 public void start()
 {
  server.listen(maxclient);
  unityengine.debug.log("server ok!");
  //实例化客户端的用户池
  pools = new stack<netusertoken>(maxclient);
  for(int i = 0; i < maxclient; i++)
  {
   netusertoken usertoken = new netusertoken();
   pools.push(usertoken);
  }
  //可以异步接受客户端, beginaccept函数的第一个参数是回调函数,当有客户端连接的时候自动调用
  server.beginaccept (asyncaccept, null);
 }
 
 //回调函数, 有客户端连接的时候会自动调用此方法
 private void asyncaccept(iasyncresult result)
 {
  try {
   //结束监听,同时获取到客户端
   socket client = server.endaccept(result);
   unityengine.debug.log("有客户端连接");
   //来了一个客户端
   netusertoken usertoken = pools.pop();
   usertoken.socket = client;
   //客户端连接之后,可以接受客户端消息
   beginreceive(usertoken);
 
   //尾递归,再次监听是否还有其他客户端连入
   server.beginaccept(asyncaccept, null);
  } catch (exception ex) {
   unityengine.debug.log(ex.tostring());
  }
 }
 
 //异步监听消息
 private void beginreceive(netusertoken usertoken)
 {
  try {
   //异步方法
   usertoken.socket.beginreceive(usertoken.buffer, 0, usertoken.buffer.length, socketflags.none,
           endreceive, usertoken);
  } catch (exception ex) {
   unityengine.debug.log(ex.tostring());
  }
 }
 
 //监听到消息之后调用的函数
 private void endreceive(iasyncresult result)
 {
  try {
   //取出客户端
   netusertoken usertoken = result.asyncstate as netusertoken;
   //获取消息的长度
   int len = usertoken.socket.endreceive(result);
   if(len > 0)
   { 
    byte[] data = new byte[len];
    buffer.blockcopy(usertoken.buffer, 0, data, 0, len);
    //用户接受消息
    usertoken.receive(data);
    //尾递归,再次监听客户端消息
    beginreceive(usertoken);
   }
 
  } catch (exception ex) {
   unityengine.debug.log(ex.tostring());
  }
 }
}


在unity中开启服务器,并使用c#控制台模拟客户端连接、发送消息操作。测试ok了,unity中可以时时监听到消息。

using unityengine;
using system.collections;

public class createserver : monobehaviour {

 void startserver () {
  netserver.instance.start();
 }

}

//c#控制台工程

using system;
using system.net;
using system.net.sockets;
using system.io;
using system.text;

namespace temp
{
 class mainclass
 {
  public static void main (string[] args)
  {
   tcpclient tc = new tcpclient();
   ipendpoint ip = new ipendpoint(ipaddress.parse("127.0.0.1"), 35353);
   tc.connect(ip);

   if(tc.connected)
   {
    while(true)
    {

     string msg = console.readline();
     byte[] result = encoding.utf8.getbytes(msg);
     tc.getstream().write(result, 0, result.length);
    }
   }
   console.readline();
  }
 }
}

using unityengine;
using system.collections;
 
public class createserver : monobehaviour {
 
 void startserver () {
  netserver.instance.start();
 }
 
}
 
//c#控制台工程
 
using system;
using system.net;
using system.net.sockets;
using system.io;
using system.text;
 
namespace temp
{
 class mainclass
 {
  public static void main (string[] args)
  {
   tcpclient tc = new tcpclient();
   ipendpoint ip = new ipendpoint(ipaddress.parse("127.0.0.1"), 35353);
   tc.connect(ip);
 
   if(tc.connected)
   {
    while(true)
    {
 
     string msg = console.readline();
     byte[] result = encoding.utf8.getbytes(msg);
     tc.getstream().write(result, 0, result.length);
    }
   }
   console.readline();
  }
 }
}

三、数据包的编码和解码

首先,举个例子,这个月信用卡被媳妇刷爆了,面对房贷车贷的压力,我只能选择分期付款。。。

那么ok了,现在我想问一下,当服务器向客户端发送的数据过大时怎么办呢?

当服务器需要向客户端发送一条很长的数据,也会“分期付款!”,服务器会把一条很长的数据分成若干条小数据,多次发送给客户端。

可是,这样就又有另外一个问题,客户端接受到多条数据之后如何解析?

这里其实就是客户端的解码。server发数据一般采用“长度+内容”的格式,client接收到数据之后,先提取出长度来,然后根据长度判断内容是否发送完毕。

再次重申,用户在发送序列化好的消息的前,需要先编码后再发送消息;用户在接受消息后,需要解码之后再解析数据(反序列化)。

using unityengine;
using system.collections.generic;
using system.io;

// 编码和解码
public class netencode {

 // 将数据编码 长度+内容
 /// < param name="data">内容< /param>
 public static byte[] encode(byte[] data)
 {
  //整形占四个字节,所以声明一个+4的数组
  byte[] result = new byte[data.length + 4];
  //使用流将编码写二进制
  memorystream ms = new memorystream();
  binarywriter br = new binarywriter(ms);
  br.write(data.length);
  br.write(data);
  //将流中的内容复制到数组中
  system.buffer.blockcopy(ms.toarray(), 0, result, 0, (int)ms.length);
  br.close();
  ms.close();
  return result;
 }

 // 将数据解码
 // < param name="cache">消息队列< /param>
 public static byte[] decode(ref list<byte> cache)
 {
  //首先要获取长度,整形4个字节,如果字节数不足4个字节
  if(cache.count < 4)
  {
   return null;
  }
  //读取数据
  memorystream ms = new memorystream(cache.toarray());
  binaryreader br = new binaryreader(ms);
  int len = br.readint32();
  //根据长度,判断内容是否传递完毕
  if(len > ms.length - ms.position)
  {
   return null;
  }
  //获取数据
  byte[] result = br.readbytes(len);
  //清空消息池
  cache.clear();
  //讲剩余没处理的消息存入消息池
  cache.addrange(br.readbytes((int)ms.length - (int)ms.position));

  return result;
 }
}

using unityengine;
using system.collections.generic;
using system.io;
 
// 编码和解码
public class netencode {
 
 // 将数据编码 长度+内容
 /// < param name="data">内容< /param>
 public static byte[] encode(byte[] data)
 {
  //整形占四个字节,所以声明一个+4的数组
  byte[] result = new byte[data.length + 4];
  //使用流将编码写二进制
  memorystream ms = new memorystream();
  binarywriter br = new binarywriter(ms);
  br.write(data.length);
  br.write(data);
  //将流中的内容复制到数组中
  system.buffer.blockcopy(ms.toarray(), 0, result, 0, (int)ms.length);
  br.close();
  ms.close();
  return result;
 }
 
 // 将数据解码
 // < param name="cache">消息队列< /param>
 public static byte[] decode(ref list<byte> cache)
 {
  //首先要获取长度,整形4个字节,如果字节数不足4个字节
  if(cache.count < 4)
  {
   return null;
  }
  //读取数据
  memorystream ms = new memorystream(cache.toarray());
  binaryreader br = new binaryreader(ms);
  int len = br.readint32();
  //根据长度,判断内容是否传递完毕
  if(len > ms.length - ms.position)
  {
   return null;
  }
  //获取数据
  byte[] result = br.readbytes(len);
  //清空消息池
  cache.clear();
  //讲剩余没处理的消息存入消息池
  cache.addrange(br.readbytes((int)ms.length - (int)ms.position));
 
  return result;
 }
}

用户接受数据代码如下:

using system;
using system.collections.generic;
using system.net.sockets;

// 表示一个客户端
public class netusertoken {
 //连接客户端的socket
 public socket socket;
 //用于存放接收数据
 public byte[] buffer;
 //每次接受和发送数据的大小
 private const int size = 1024;

 //接收数据池
 private list<byte> receivecache;
 private bool isreceiving;
 //发送数据池
 private queue<byte[]> sendcache;
 private bool issending;

 //接收到消息之后的回调
 public action<netmodel> receivecallback;


 public netusertoken()
 {
  buffer = new byte[size];
  receivecache = new list<byte>();
  sendcache = new queue<byte[]>();
 }

 // 服务器接受客户端发送的消息
 // < param name="data">data.< /param>
 public void receive(byte[] data)
 {
  unityengine.debug.log("接收到数据");
  //将接收到的数据放入数据池中
  receivecache.addrange(data);
  //如果没在读数据
  if(!isreceiving)
  {
   isreceiving = true;
   readdata();
  }
 }

 // 读取数据
 private void readdata()
 {
  byte[] data = netencode.decode(ref receivecache);
  //说明数据保存成功
  if(data != null)
  {
   netmodel item = netserilizer.deserialize(data);
   unityengine.debug.log(item.message);
   if(receivecallback != null)
   {
    receivecallback(item);
   }
   //尾递归,继续读取数据
   readdata();
  }
  else
  {
   isreceiving = false;
  }
 }

 // 服务器发送消息给客户端
 public void send()
 {
  try {
   if (sendcache.count == 0) {
    issending = false;
    return; 
   }
   byte[] data = sendcache.dequeue ();
   int count = data.length / size;
   int len = size;
   for (int i = 0; i < count + 1; i++) {
    if (i == count) {
     len = data.length - i * size;
    }
    socket.send (data, i * size, len, socketflags.none);
   }
   unityengine.debug.log("发送成功!");
   send ();
  } catch (exception ex) {
   unityengine.debug.log(ex.tostring());
  }
 }

 public void writesenddate(byte[] data){
  sendcache.enqueue(data);
  if(!issending)
  {
   issending = true;
   send();
  }
 }
}

using system;
using system.collections.generic;
using system.net.sockets;
 
// 表示一个客户端
public class netusertoken {
 //连接客户端的socket
 public socket socket;
 //用于存放接收数据
 public byte[] buffer;
 //每次接受和发送数据的大小
 private const int size = 1024;
 
 //接收数据池
 private list<byte> receivecache;
 private bool isreceiving;
 //发送数据池
 private queue<byte[]> sendcache;
 private bool issending;
 
 //接收到消息之后的回调
 public action<netmodel> receivecallback;
 
 
 public netusertoken()
 {
  buffer = new byte[size];
  receivecache = new list<byte>();
  sendcache = new queue<byte[]>();
 }
 
 // 服务器接受客户端发送的消息
 // < param name="data">data.< /param>
 public void receive(byte[] data)
 {
  unityengine.debug.log("接收到数据");
  //将接收到的数据放入数据池中
  receivecache.addrange(data);
  //如果没在读数据
  if(!isreceiving)
  {
   isreceiving = true;
   readdata();
  }
 }
 
 // 读取数据
 private void readdata()
 {
  byte[] data = netencode.decode(ref receivecache);
  //说明数据保存成功
  if(data != null)
  {
   netmodel item = netserilizer.deserialize(data);
   unityengine.debug.log(item.message);
   if(receivecallback != null)
   {
    receivecallback(item);
   }
   //尾递归,继续读取数据
   readdata();
  }
  else
  {
   isreceiving = false;
  }
 }
 
 // 服务器发送消息给客户端
 public void send()
 {
  try {
   if (sendcache.count == 0) {
    issending = false;
    return; 
   }
   byte[] data = sendcache.dequeue ();
   int count = data.length / size;
   int len = size;
   for (int i = 0; i < count + 1; i++) {
    if (i == count) {
     len = data.length - i * size;
    }
    socket.send (data, i * size, len, socketflags.none);
   }
   unityengine.debug.log("发送成功!");
   send ();
  } catch (exception ex) {
   unityengine.debug.log(ex.tostring());
  }
 }
 
 public void writesenddate(byte[] data){
  sendcache.enqueue(data);
  if(!issending)
  {
   issending = true;
   send();
  }
 }
}

protobuf网络传输到这里就全部完成了。