一、RPC原理 1、RPC框架由来 单体应用体量越来越大,代码不好维护和管理,所以就产生了微服务架构,按照公共或功能模块拆分为一个个独立的服务,然后各独立的服务之间可以相互调用。 微服务之间相互调用,该如何实现? 首先要解决下面5个问题: 1、如何规定远程调用的语法? 2、如何传递参数? 3、如何表 ......








当客户端的应用想发起一个远程调用时,它实际是调用客户端的 stub。它负责将调用的接口、方法和参数,通过约定的协议规范进行编码,并通过本地的 rpcruntime 进行传输,将调用网络包发送到服务器。服务器端的 rpcruntime 收到请求后,交给服务器端的 stub 进行解码,然后调用服务端的方法,服务端执行方法,返回结果,服务器端的 stub 将返回结果编码后,发送给客户端,客户端的 rpcruntime 收到结果,发给客户端的 stub 解码得到结果,返回给客户端。

2、对于 stub 层,处理双方约定好的语法、语义、封装、解封装。
3、对于 rpcruntime,主要处理高性能的传输,以及网络的错误和异常。



grpc 是一个高性能、开源和通用的 rpc 框架,面向移动和 http/2 设计。目前提供 c、java 和 go 语言版本,分别是:grpc, grpc-java, grpc-go. 其中 c 版本支持 c, c++, node.js, python, ruby, objective-c, php 和 c# 支持。


2、protocol buffer原理文章

3、grpc-go github地址:

grpc-go 的stub层协议约定问题通过.proto文件约定好服务接口、参数等,通过工具protoc-gen-go生成客户端和服务端共用的对照表,想生成什么语言文件就用相应的插件,这样就实现了跨语言。
protoc --go_out=plugins=grpc:. *.proto
grpc rpcruntime层基于http/2设计,带来诸如双向流、流控、头部压缩、单 tcp 连接上的多复用请求等特性。

grpc server端启动


func main() {
    lis, err := net.listen("tcp", fmt.sprintf("localhost:%d", *port))
    if err != nil {
        log.fatalf("failed to listen: %v", err)

    var opts []grpc.serveroption
    if *tls {
        if *certfile == "" {
            *certfile = testdata.path("server1.pem")
        if *keyfile == "" {
            *keyfile = testdata.path("server1.key")
        creds, err := credentials.newservertlsfromfile(*certfile, *keyfile)
        if err != nil {
            log.fatalf("failed to generate credentials %v", err)
        opts = []grpc.serveroption{grpc.creds(creds)}
    grpcserver := grpc.newserver(opts...)
    pb.registerrouteguideserver(grpcserver, newserver())


关键处理就是一个for循环。如果accept() 返回错误,并且错误是临时性的,那么会有重试,重试时间以5ms翻倍增长,直到1s。

for {
        rawconn, err := lis.accept()
        if err != nil {
            if ne, ok := err.(interface {
                temporary() bool
            }); ok && ne.temporary() {
                if tempdelay == 0 {
                    tempdelay = 5 * time.millisecond
                } else {
                    tempdelay *= 2
                if max := 1 * time.second; tempdelay > max {
                    tempdelay = max
                s.printf("accept error: %v; retrying in %v", err, tempdelay)
                timer := time.newtimer(tempdelay)
                select {
                case <-timer.c:
                case <-s.quit.done():
                    return nil
            s.printf("done serving; accept = %v", err)

            if s.quit.hasfired() {
                return nil
            return err
        tempdelay = 0
        // start a new goroutine to deal with rawconn so we don't stall this accept
        // loop goroutine.
        // make sure we account for the goroutine so gracefulstop doesn't nil out
        // s.conns before this conn can be added.
        go func() {



st := s.newhttp2transport(conn, authinfo)
    if st == nil {

    if !s.addconn(st) {
    go func() {

grpc client端启动


    var opts []grpc.dialoption
    if *tls {
        if *cafile == "" {
            *cafile = testdata.path("ca.pem")
        creds, err := credentials.newclienttlsfromfile(*cafile, *serverhostoverride)
        if err != nil {
            log.fatalf("failed to create tls credentials %v", err)
        opts = append(opts, grpc.withtransportcredentials(creds))
    } else {
        opts = append(opts, grpc.withinsecure())

    opts = append(opts, grpc.withblock())
    conn, err := grpc.dial(*serveraddr, opts...)
    if err != nil {
        log.fatalf("fail to dial: %v", err)
    defer conn.close()
    client := pb.newrouteguideclient(conn)


unary rpc: 一元rpc

func (c *routeguideclient) getfeature(ctx context.context, in *point, opts ...grpc.calloption) (*feature, error) {
    out := new(feature)
    err := c.cc.invoke(ctx, "/routeguide.routeguide/getfeature", in, out, opts...)
    if err != nil {
        return nil, err
    return out, nil

// printfeature gets the feature for the given point.
func printfeature(client pb.routeguideclient, point *pb.point) {
    log.printf("getting feature for point (%d, %d)", point.latitude, point.longitude)
    ctx, cancel := context.withtimeout(context.background(), 10*time.second)
    defer cancel()
    feature, err := client.getfeature(ctx, point)
    if err != nil {
        log.fatalf("%v.getfeatures(_) = _, %v: ", client, err)

// getfeature returns the feature at the given point.
func (s *routeguideserver) getfeature(ctx context.context, point *pb.point) (*pb.feature, error) {
    for _, feature := range s.savedfeatures {
        if proto.equal(feature.location, point) {
            return feature, nil
    // no feature was found, return an unnamed feature
    return &pb.feature{location: point}, nil

server-side streaming rpc: 服务端流式rpc

func (c *routeguideclient) listfeatures(ctx context.context, in *rectangle, opts ...grpc.calloption) (routeguide_listfeaturesclient, error) {
    stream, err := c.cc.newstream(ctx, &_routeguide_servicedesc.streams[0], "/routeguide.routeguide/listfeatures", opts...)
    if err != nil {
        return nil, err
    x := &routeguidelistfeaturesclient{stream}
    if err := x.clientstream.sendmsg(in); err != nil {
        return nil, err
    if err := x.clientstream.closesend(); err != nil {
        return nil, err
    return x, nil

// printfeatures lists all the features within the given bounding rectangle.
func printfeatures(client pb.routeguideclient, rect *pb.rectangle) {
    log.printf("looking for features within %v", rect)
    ctx, cancel := context.withtimeout(context.background(), 10*time.second)
    defer cancel()
    stream, err := client.listfeatures(ctx, rect)
    if err != nil {
        log.fatalf("%v.listfeatures(_) = _, %v", client, err)
    for {
        feature, err := stream.recv()
        if err == io.eof {
        if err != nil {
            log.fatalf("%v.listfeatures(_) = _, %v", client, err)

// listfeatures lists all features contained within the given bounding rectangle.
func (s *routeguideserver) listfeatures(rect *pb.rectangle, stream pb.routeguide_listfeaturesserver) error {
    for _, feature := range s.savedfeatures {
        if inrange(feature.location, rect) {
            if err := stream.send(feature); err != nil {
                return err
    return nil

client-side streaming rpc: 客户端流式rpc

func (c *routeguideclient) recordroute(ctx context.context, opts ...grpc.calloption) (routeguide_recordrouteclient, error) {
    stream, err := c.cc.newstream(ctx, &_routeguide_servicedesc.streams[1], "/routeguide.routeguide/recordroute", opts...)
    if err != nil {
        return nil, err
    x := &routeguiderecordrouteclient{stream}
    return x, nil

// runrecordroute sends a sequence of points to server and expects to get a routesummary from server.
func runrecordroute(client pb.routeguideclient) {
    // create a random number of random points
    r := rand.new(rand.newsource(time.now().unixnano()))
    pointcount := int(r.int31n(100)) + 2 // traverse at least two points
    var points []*pb.point
    for i := 0; i < pointcount; i++ {
        points = append(points, randompoint(r))
    log.printf("traversing %d points.", len(points))
    ctx, cancel := context.withtimeout(context.background(), 10*time.second)
    defer cancel()
    stream, err := client.recordroute(ctx)
    if err != nil {
        log.fatalf("%v.recordroute(_) = _, %v", client, err)
    for _, point := range points {
        if err := stream.send(point); err != nil {
            log.fatalf("%v.send(%v) = %v", stream, point, err)
    reply, err := stream.closeandrecv()
    if err != nil {
        log.fatalf("%v.closeandrecv() got error %v, want %v", stream, err, nil)
    log.printf("route summary: %v", reply)

// recordroute records a route composited of a sequence of points.
// it gets a stream of points, and responds with statistics about the "trip":
// number of points,  number of known features visited, total distance traveled, and
// total time spent.
func (s *routeguideserver) recordroute(stream pb.routeguide_recordrouteserver) error {
    var pointcount, featurecount, distance int32
    var lastpoint *pb.point
    starttime := time.now()
    for {
        point, err := stream.recv()
        if err == io.eof {
            endtime := time.now()
            return stream.sendandclose(&pb.routesummary{
                pointcount:   pointcount,
                featurecount: featurecount,
                distance:     distance,
                elapsedtime:  int32(endtime.sub(starttime).seconds()),
        if err != nil {
            return err
        for _, feature := range s.savedfeatures {
            if proto.equal(feature.location, point) {
        if lastpoint != nil {
            distance += calcdistance(lastpoint, point)
        lastpoint = point

bidirectional streaming rpc : 双向流式rpc

func (c *routeguideclient) routechat(ctx context.context, opts ...grpc.calloption) (routeguide_routechatclient, error) {
    stream, err := c.cc.newstream(ctx, &_routeguide_servicedesc.streams[2], "/routeguide.routeguide/routechat", opts...)
    if err != nil {
        return nil, err
    x := &routeguideroutechatclient{stream}
    return x, nil

// runroutechat receives a sequence of route notes, while sending notes for various locations.
func runroutechat(client pb.routeguideclient) {
    notes := []*pb.routenote{
        {location: &pb.point{latitude: 0, longitude: 1}, message: "first message"},
        {location: &pb.point{latitude: 0, longitude: 2}, message: "second message"},
        {location: &pb.point{latitude: 0, longitude: 3}, message: "third message"},
        {location: &pb.point{latitude: 0, longitude: 1}, message: "fourth message"},
        {location: &pb.point{latitude: 0, longitude: 2}, message: "fifth message"},
        {location: &pb.point{latitude: 0, longitude: 3}, message: "sixth message"},
    ctx, cancel := context.withtimeout(context.background(), 10*time.second)
    defer cancel()
    stream, err := client.routechat(ctx)
    if err != nil {
        log.fatalf("%v.routechat(_) = _, %v", client, err)
    waitc := make(chan struct{})
    go func() {
        for {
            in, err := stream.recv()
            if err == io.eof {
                // read done.
            if err != nil {
                log.fatalf("failed to receive a note : %v", err)
            log.printf("got message %s at point(%d, %d)", in.message, in.location.latitude, in.location.longitude)
    for _, note := range notes {
        if err := stream.send(note); err != nil {
            log.fatalf("failed to send a note: %v", err)

// routechat receives a stream of message/location pairs, and responds with a stream of all
// previous messages at each of those locations.
func (s *routeguideserver) routechat(stream pb.routeguide_routechatserver) error {
    for {
        in, err := stream.recv()
        if err == io.eof {
            return nil
        if err != nil {
            return err
        key := serialize(in.location)

        s.routenotes[key] = append(s.routenotes[key], in)
        // note: this copy prevents blocking other clients while serving this one.
        // we don't need to do a deep copy, because elements in the slice are
        // insert-only and never modified.
        rn := make([]*pb.routenote, len(s.routenotes[key]))
        copy(rn, s.routenotes[key])

        for _, note := range rn {
            if err := stream.send(note); err != nil {
                return err

client 连接底层两个主要方法


newclientstream:获取传输层 trasport 并组合封装到 clientstream 中返回,在这块会涉及负载均衡、超时控制、 encoding、 stream 的动作,与服务端基本一致的行为。
cs.sendmsg:发送 rpc 请求出去,但其并不承担等待响应的功能。
cs.recvmsg:阻塞等待接受到的 rpc 方法响应结果。

// invoke sends the rpc request on the wire and returns after response is
// received.  this is typically called by generated code.
// all errors returned by invoke are compatible with the status package.
func (cc *clientconn) invoke(ctx context.context, method string, args, reply interface{}, opts ...calloption) error {
    // allow interceptor to see all applicable call options, which means those
    // configured as defaults from dial option as well as per-call options
    opts = combine(cc.dopts.calloptions, opts)

    if cc.dopts.unaryint != nil {
        return cc.dopts.unaryint(ctx, method, args, reply, cc, invoke, opts...)
    return invoke(ctx, method, args, reply, cc, opts...)

func invoke(ctx context.context, method string, req, reply interface{}, cc *clientconn, opts ...calloption) error {
    cs, err := newclientstream(ctx, unarystreamdesc, cc, method, opts...)
    if err != nil {
        return err
    if err := cs.sendmsg(req); err != nil {
        return err
    return cs.recvmsg(reply)


// newstream creates a new stream for the client side. this is typically
// called by generated code. ctx is used for the lifetime of the stream.
// to ensure resources are not leaked due to the stream returned, one of the following
// actions must be performed:
//      1. call close on the clientconn.
//      2. cancel the context provided.
//      3. call recvmsg until a non-nil error is returned. a protobuf-generated
//         client-streaming rpc, for instance, might use the helper function
//         closeandrecv (note that closesend does not recv, therefore is not
//         guaranteed to release all resources).
//      4. receive a non-nil, non-io.eof error from header or sendmsg.
// if none of the above happen, a goroutine and a context will be leaked, and grpc
// will not call the optionally-configured stats handler with a stats.end message.
func (cc *clientconn) newstream(ctx context.context, desc *streamdesc, method string, opts ...calloption) (clientstream, error) {
    // allow interceptor to see all applicable call options, which means those
    // configured as defaults from dial option as well as per-call options
    opts = combine(cc.dopts.calloptions, opts)

    if cc.dopts.streamint != nil {
        return cc.dopts.streamint(ctx, desc, cc, method, newclientstream, opts...)
    return newclientstream(ctx, desc, cc, method, opts...)

