mkdir rpcstreaming cd rpcstreaming dotnet new console dotnet add package grpc // 添加 grpc 包 dotnet add package grpc.tools // 添加 grpc 工具包 dotnet add package google.protobuf // 添加 protobuf 支持
然后为了支持 protobuf 语言,我们需要修改项目配置文件,在项目中引入 .proto 文件以便生成对应的代码。
<project sdk="microsoft.net.sdk"> ... <propertygroup> ... <langversion>latest</langversion> ... </propertygroup> <itemgroup> ... <protobuf include="**/*.proto" /> ... </itemgroup> ... </project>
这里我们使用了 wildcard 语法匹配了项目内的全部 proto 文件用于生成对应的代码。
1 synatx = "proto3"; 2 service rpcstreamingservice { 3 rpc getstreamcontent (streamrequest) returns (stream streamcontent) {} 4 } 5 message streamrequest { 6 string filename = 1; 7 } 8 message streamcontent { 9 bytes content = 1; 10 }
做 rpc 请求时,我们向 rpc 服务器发送一个 streamrequest 的 message,其中包含了文件路径;为了让服务器以流式传输数据,我们在 returns 内加一个 “stream”。
public virtual global::system.threading.tasks.task getstreamcontent(global::streamrequest request, grpc::iserverstreamwriter<global::streamcontent> responsestream, grpc::servercallcontext context) { throw new grpc::rpcexception(new grpc::status(grpc::statuscode.unimplemented, "")); }
这样就简单了,我们新建一个类 rpcserviceimpl,继承 rpcstreamingservice.rpcstreamingservicebase,然后实现对应的方法即可。
为了串流,我们需要将数据流不断写入 response,这里给一个简单的示例。1 using system; 2 using system.io; 3 using system.threading.tasks; 4 using google.protobuf; 5 using grpc.core; 6 namespace rpcstreaming 7 { 8 public class rpcstreamingserviceimpl : rpcstreamingservice.rpcstreamingservicebase 9 { 10 public override task getstreamcontent(streamrequest request, iserverstreamwriter<streamcontent> response, servercallcontext context) 11 { 12 return task.run(async () => 13 { 14 using (var fs = file.open(request.filename, filemode.open)) // 从 request 中读取文件名并打开文件流 15 { 16 var remaininglength = fs.length; // 剩余长度 17 var buff = new byte[1048576]; // 缓冲区,这里我们设置为 1 mb 18 while (remaininglength > 0) // 若未读完则继续读取 19 { 20 var len = await fs.readasync(buff); // 异步从文件中读取数据到缓冲区中 21 remaininglength -= len; // 剩余长度减去刚才实际读取的长度 22 23 // 向流中写入我们刚刚读取的数据 24 await response.writeasync(new streamcontent 25 { 26 content = bytestring.copyfrom(buff, 0, len) 27 }); 28 } 29 } 30 }); 31 } 32 } 33 }
1 using google.protobuf; 2 using grpc.core;
然后我们在 main 函数中构建并启动 rpc server,监听 localhost:23333
1 new server 2 { 3 services = { rpcstreamingservice.bindservice(new rpcstreamingserviceimpl()) }, // 绑定我们的实现 4 ports = { new serverport("localhost", 23333, servercredentials.insecure) } 5 }.start(); 6 console.readkey();
这样服务端就构建完成了。
1 // 原来的 main 函数 2 static void main(string[] args) { ... } 3 // 改写后的 main 函数 4 static async task main(string[] args) { ... }
另外,还需要:
1 using system; 2 using system.io; 3 using system.threading.tasks; 4 using google.protobuf; 5 using grpc.core;
然后我们在 main 函数中添加调用代码:
1 var channel = new channel("localhost:23333", channelcredentials.insecure); // 建立到 localhost:23333 的 channel 2 var client = new rpcstreamingservice.rpcstreamingserviceclient(channel); // 建立 client 3 // 调用 rpc api 4 var result = client.getstreamcontent(new streamrequest { filename = "你想获取的文件路径" }); 5 var iter = result.responsestream; // 拿到响应流 6 using (var fs = new filestream("写获取的数据的文件路径", filemode.create)) // 新建一个文件流用于存放我们获取到数据 7 { 8 while (await iter.movenext()) // 迭代 9 { 10 iter.current.content.writeto(fs); // 将数据写入到文件流中 11 } 12 }
dotnet run
会发现,我们想要获取的文件的数据被不断地写到我们指定的文件中,每次 1 mb。在我的电脑上测试,内网的环境下传输速度大概 80~90 mb/s,几乎跑满了我的千兆网卡,速度非常理想。