RPC原理以及GRPC详解
一、rpc原理
1、rpc框架由来
单体应用体量越来越大,代码不好维护和管理,所以就产生了微服务架构,按照公共或功能模块拆分为一个个独立的服务,然后各独立的服务之间可以相互调用。
微服务之间相互调用,该如何实现?
首先要解决下面5个问题:
1、如何规定远程调用的语法?
2、如何传递参数?
3、如何表示数据?
4、如何知道一个服务端都实现了哪些远程调用?从哪个端口可以访问这个远程调用?
5、发生了错误、重传、丢包、性能等问题怎么办?
大家可能都写过socket或则http通信,简单的client访问server的模式,认为通过这个就可以解决服务之间的相互调用了,但是考虑下上面5个问题,处理起来就不是那么容易的事情了,非个人可以完成的工作。
于是就诞生了rpc框架,让我们不用管底层实现,简单好用:
2、rpc框架原理
当客户端的应用想发起一个远程调用时,它实际是调用客户端的 stub。它负责将调用的接口、方法和参数,通过约定的协议规范进行编码,并通过本地的 rpcruntime 进行传输,将调用网络包发送到服务器。服务器端的 rpcruntime 收到请求后,交给服务器端的 stub 进行解码,然后调用服务端的方法,服务端执行方法,返回结果,服务器端的 stub 将返回结果编码后,发送给客户端,客户端的 rpcruntime 收到结果,发给客户端的 stub 解码得到结果,返回给客户端。
1、对于客户端而言,这些过程是透明的,就像本地调用一样;对于服务端而言,专注于业务逻辑的处理就可以了。
2、对于 stub 层,处理双方约定好的语法、语义、封装、解封装。
3、对于 rpcruntime,主要处理高性能的传输,以及网络的错误和异常。
来看一下rpc框架是如何解决上面5个问题的:1、2、3的问题可以由stub层解决,4的问题可以由服务注册和发布解决,5的问题可以由rpcruntime解决。
二、grpc原理
grpc 是一个高性能、开源和通用的 rpc 框架,面向移动和 http/2 设计。目前提供 c、java 和 go 语言版本,分别是:grpc, grpc-java, grpc-go. 其中 c 版本支持 c, c++, node.js, python, ruby, objective-c, php 和 c# 支持。
本文以go语言版本讲解
1、golang安装grpc
2、protocol buffer原理文章
3、grpc-go github地址:
grpc-go 的stub层协议约定问题通过.proto文件约定好服务接口、参数等,通过工具protoc-gen-go生成客户端和服务端共用的对照表,想生成什么语言文件就用相应的插件,这样就实现了跨语言。
生成go语言文件使用命令如下:
protoc --go_out=plugins=grpc:. *.proto
grpc rpcruntime层基于http/2设计,带来诸如双向流、流控、头部压缩、单 tcp 连接上的多复用请求等特性。
grpc server端启动
1、整体启动过程
func main() { //解析运行参数 flag.parse() //配置监听协议、地址、端口 lis, err := net.listen("tcp", fmt.sprintf("localhost:%d", *port)) if err != nil { log.fatalf("failed to listen: %v", err) } //grpc额外的服务配置,这里主要是需不需要加密 var opts []grpc.serveroption if *tls { if *certfile == "" { *certfile = testdata.path("server1.pem") } if *keyfile == "" { *keyfile = testdata.path("server1.key") } creds, err := credentials.newservertlsfromfile(*certfile, *keyfile) if err != nil { log.fatalf("failed to generate credentials %v", err) } opts = []grpc.serveroption{grpc.creds(creds)} } //grpc服务初始化,绑定一些配置参数 grpcserver := grpc.newserver(opts...) //把.proto文件中定义的接口api实现注册到grpc服务上,方便调用 pb.registerrouteguideserver(grpcserver, newserver()) //grpc服务启动,开始监听 grpcserver.serve(lis) }
2、serve函数
关键处理就是一个for循环。如果accept() 返回错误,并且错误是临时性的,那么会有重试,重试时间以5ms翻倍增长,直到1s。
for { rawconn, err := lis.accept() //错误处理 if err != nil { if ne, ok := err.(interface { temporary() bool }); ok && ne.temporary() { if tempdelay == 0 { tempdelay = 5 * time.millisecond } else { tempdelay *= 2 } if max := 1 * time.second; tempdelay > max { tempdelay = max } s.mu.lock() s.printf("accept error: %v; retrying in %v", err, tempdelay) s.mu.unlock() timer := time.newtimer(tempdelay) select { case <-timer.c: case <-s.quit.done(): timer.stop() return nil } continue } s.mu.lock() s.printf("done serving; accept = %v", err) s.mu.unlock() if s.quit.hasfired() { return nil } return err } tempdelay = 0 // start a new goroutine to deal with rawconn so we don't stall this accept // loop goroutine. // // make sure we account for the goroutine so gracefulstop doesn't nil out // s.conns before this conn can be added. s.servewg.add(1) //重新启动一个goroutine处理accept的连接 go func() { s.handlerawconn(rawconn) s.servewg.done() }() }
3、handlerawconn函数
主要作用就是获取一个服务端的transport,并开一个goroutine等待处理stream,里面会涉及到调用注册的方法。
st := s.newhttp2transport(conn, authinfo) if st == nil { return } rawconn.setdeadline(time.time{}) if !s.addconn(st) { return } go func() { s.servestreams(st) s.removeconn(st) }()
grpc client端启动
1、建立连接和绑定实现的接口
//解析运行参数 flag.parse() //连接的一些配置,主要是加密,安全、阻塞 var opts []grpc.dialoption if *tls { if *cafile == "" { *cafile = testdata.path("ca.pem") } creds, err := credentials.newclienttlsfromfile(*cafile, *serverhostoverride) if err != nil { log.fatalf("failed to create tls credentials %v", err) } opts = append(opts, grpc.withtransportcredentials(creds)) } else { opts = append(opts, grpc.withinsecure()) } opts = append(opts, grpc.withblock()) //建立一个连接 conn, err := grpc.dial(*serveraddr, opts...) if err != nil { log.fatalf("fail to dial: %v", err) } defer conn.close() //创建一个实现了.proto文件定义的接口api的client client := pb.newrouteguideclient(conn)
2、client调用方式
unary rpc: 一元rpc
func (c *routeguideclient) getfeature(ctx context.context, in *point, opts ...grpc.calloption) (*feature, error) { out := new(feature) err := c.cc.invoke(ctx, "/routeguide.routeguide/getfeature", in, out, opts...) if err != nil { return nil, err } return out, nil } // printfeature gets the feature for the given point. func printfeature(client pb.routeguideclient, point *pb.point) { log.printf("getting feature for point (%d, %d)", point.latitude, point.longitude) ctx, cancel := context.withtimeout(context.background(), 10*time.second) defer cancel() feature, err := client.getfeature(ctx, point) if err != nil { log.fatalf("%v.getfeatures(_) = _, %v: ", client, err) } log.println(feature) } // getfeature returns the feature at the given point. func (s *routeguideserver) getfeature(ctx context.context, point *pb.point) (*pb.feature, error) { for _, feature := range s.savedfeatures { if proto.equal(feature.location, point) { return feature, nil } } // no feature was found, return an unnamed feature return &pb.feature{location: point}, nil }
server-side streaming rpc: 服务端流式rpc
func (c *routeguideclient) listfeatures(ctx context.context, in *rectangle, opts ...grpc.calloption) (routeguide_listfeaturesclient, error) { stream, err := c.cc.newstream(ctx, &_routeguide_servicedesc.streams[0], "/routeguide.routeguide/listfeatures", opts...) if err != nil { return nil, err } x := &routeguidelistfeaturesclient{stream} if err := x.clientstream.sendmsg(in); err != nil { return nil, err } if err := x.clientstream.closesend(); err != nil { return nil, err } return x, nil } // printfeatures lists all the features within the given bounding rectangle. func printfeatures(client pb.routeguideclient, rect *pb.rectangle) { log.printf("looking for features within %v", rect) ctx, cancel := context.withtimeout(context.background(), 10*time.second) defer cancel() stream, err := client.listfeatures(ctx, rect) if err != nil { log.fatalf("%v.listfeatures(_) = _, %v", client, err) } for { feature, err := stream.recv() if err == io.eof { break } if err != nil { log.fatalf("%v.listfeatures(_) = _, %v", client, err) } log.println(feature) } } // listfeatures lists all features contained within the given bounding rectangle. func (s *routeguideserver) listfeatures(rect *pb.rectangle, stream pb.routeguide_listfeaturesserver) error { for _, feature := range s.savedfeatures { if inrange(feature.location, rect) { if err := stream.send(feature); err != nil { return err } } } return nil }
client-side streaming rpc: 客户端流式rpc
func (c *routeguideclient) recordroute(ctx context.context, opts ...grpc.calloption) (routeguide_recordrouteclient, error) { stream, err := c.cc.newstream(ctx, &_routeguide_servicedesc.streams[1], "/routeguide.routeguide/recordroute", opts...) if err != nil { return nil, err } x := &routeguiderecordrouteclient{stream} return x, nil } // runrecordroute sends a sequence of points to server and expects to get a routesummary from server. func runrecordroute(client pb.routeguideclient) { // create a random number of random points r := rand.new(rand.newsource(time.now().unixnano())) pointcount := int(r.int31n(100)) + 2 // traverse at least two points var points []*pb.point for i := 0; i < pointcount; i++ { points = append(points, randompoint(r)) } log.printf("traversing %d points.", len(points)) ctx, cancel := context.withtimeout(context.background(), 10*time.second) defer cancel() stream, err := client.recordroute(ctx) if err != nil { log.fatalf("%v.recordroute(_) = _, %v", client, err) } for _, point := range points { if err := stream.send(point); err != nil { log.fatalf("%v.send(%v) = %v", stream, point, err) } } reply, err := stream.closeandrecv() if err != nil { log.fatalf("%v.closeandrecv() got error %v, want %v", stream, err, nil) } log.printf("route summary: %v", reply) } // recordroute records a route composited of a sequence of points. // // it gets a stream of points, and responds with statistics about the "trip": // number of points, number of known features visited, total distance traveled, and // total time spent. func (s *routeguideserver) recordroute(stream pb.routeguide_recordrouteserver) error { var pointcount, featurecount, distance int32 var lastpoint *pb.point starttime := time.now() for { point, err := stream.recv() if err == io.eof { endtime := time.now() return stream.sendandclose(&pb.routesummary{ pointcount: pointcount, featurecount: featurecount, distance: distance, elapsedtime: int32(endtime.sub(starttime).seconds()), }) } if err != nil { return err } pointcount++ for _, feature := range s.savedfeatures { if proto.equal(feature.location, point) { featurecount++ } } if lastpoint != nil { distance += calcdistance(lastpoint, point) } lastpoint = point } }
bidirectional streaming rpc : 双向流式rpc
func (c *routeguideclient) routechat(ctx context.context, opts ...grpc.calloption) (routeguide_routechatclient, error) { stream, err := c.cc.newstream(ctx, &_routeguide_servicedesc.streams[2], "/routeguide.routeguide/routechat", opts...) if err != nil { return nil, err } x := &routeguideroutechatclient{stream} return x, nil } // runroutechat receives a sequence of route notes, while sending notes for various locations. func runroutechat(client pb.routeguideclient) { notes := []*pb.routenote{ {location: &pb.point{latitude: 0, longitude: 1}, message: "first message"}, {location: &pb.point{latitude: 0, longitude: 2}, message: "second message"}, {location: &pb.point{latitude: 0, longitude: 3}, message: "third message"}, {location: &pb.point{latitude: 0, longitude: 1}, message: "fourth message"}, {location: &pb.point{latitude: 0, longitude: 2}, message: "fifth message"}, {location: &pb.point{latitude: 0, longitude: 3}, message: "sixth message"}, } ctx, cancel := context.withtimeout(context.background(), 10*time.second) defer cancel() stream, err := client.routechat(ctx) if err != nil { log.fatalf("%v.routechat(_) = _, %v", client, err) } waitc := make(chan struct{}) go func() { for { in, err := stream.recv() if err == io.eof { // read done. close(waitc) return } if err != nil { log.fatalf("failed to receive a note : %v", err) } log.printf("got message %s at point(%d, %d)", in.message, in.location.latitude, in.location.longitude) } }() for _, note := range notes { if err := stream.send(note); err != nil { log.fatalf("failed to send a note: %v", err) } } stream.closesend() <-waitc } // routechat receives a stream of message/location pairs, and responds with a stream of all // previous messages at each of those locations. func (s *routeguideserver) routechat(stream pb.routeguide_routechatserver) error { for { in, err := stream.recv() if err == io.eof { return nil } if err != nil { return err } key := serialize(in.location) s.mu.lock() s.routenotes[key] = append(s.routenotes[key], in) // note: this copy prevents blocking other clients while serving this one. // we don't need to do a deep copy, because elements in the slice are // insert-only and never modified. rn := make([]*pb.routenote, len(s.routenotes[key])) copy(rn, s.routenotes[key]) s.mu.unlock() for _, note := range rn { if err := stream.send(note); err != nil { return err } } } }
client 连接底层两个主要方法
1、invoke函数
newclientstream:获取传输层 trasport 并组合封装到 clientstream 中返回,在这块会涉及负载均衡、超时控制、 encoding、 stream 的动作,与服务端基本一致的行为。
cs.sendmsg:发送 rpc 请求出去,但其并不承担等待响应的功能。
cs.recvmsg:阻塞等待接受到的 rpc 方法响应结果。
// invoke sends the rpc request on the wire and returns after response is // received. this is typically called by generated code. // // all errors returned by invoke are compatible with the status package. func (cc *clientconn) invoke(ctx context.context, method string, args, reply interface{}, opts ...calloption) error { // allow interceptor to see all applicable call options, which means those // configured as defaults from dial option as well as per-call options opts = combine(cc.dopts.calloptions, opts) if cc.dopts.unaryint != nil { return cc.dopts.unaryint(ctx, method, args, reply, cc, invoke, opts...) } return invoke(ctx, method, args, reply, cc, opts...) } func invoke(ctx context.context, method string, req, reply interface{}, cc *clientconn, opts ...calloption) error { cs, err := newclientstream(ctx, unarystreamdesc, cc, method, opts...) if err != nil { return err } if err := cs.sendmsg(req); err != nil { return err } return cs.recvmsg(reply) }
2、newstream函数
// newstream creates a new stream for the client side. this is typically // called by generated code. ctx is used for the lifetime of the stream. // // to ensure resources are not leaked due to the stream returned, one of the following // actions must be performed: // // 1. call close on the clientconn. // 2. cancel the context provided. // 3. call recvmsg until a non-nil error is returned. a protobuf-generated // client-streaming rpc, for instance, might use the helper function // closeandrecv (note that closesend does not recv, therefore is not // guaranteed to release all resources). // 4. receive a non-nil, non-io.eof error from header or sendmsg. // // if none of the above happen, a goroutine and a context will be leaked, and grpc // will not call the optionally-configured stats handler with a stats.end message. func (cc *clientconn) newstream(ctx context.context, desc *streamdesc, method string, opts ...calloption) (clientstream, error) { // allow interceptor to see all applicable call options, which means those // configured as defaults from dial option as well as per-call options opts = combine(cc.dopts.calloptions, opts) if cc.dopts.streamint != nil { return cc.dopts.streamint(ctx, desc, cc, method, newclientstream, opts...) } return newclientstream(ctx, desc, cc, method, opts...) }
参考资料
1.从实践到原理,带你参透grpc.
2..