欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

【.NET Core项目实战-统一认证平台】第十六章 网关篇-Ocelot集成RPC服务

程序员文章站 2023-02-24 17:52:09
" 【.NET Core项目实战 统一认证平台】开篇及目录索引 " 一、什么是RPC RPC是“远程调用( Remote Procedure Call )”的一个名称的缩写,并不是任何规范化的协议,也不是大众都认知的协议标准,我们更多时候使用时都是创建的自定义化(例如Socket,Netty)的消息 ......

【.net core项目实战-统一认证平台】开篇及目录索引

一、什么是rpc

rpc是“远程调用(remote procedure call)”的一个名称的缩写,并不是任何规范化的协议,也不是大众都认知的协议标准,我们更多时候使用时都是创建的自定义化(例如socket,netty)的消息方式进行调用,相比http协议,我们省掉了不少http中无用的消息内容。因此很多系统内部调用仍然采用自定义化的rpc调用模式进行通信,毕竟速度和性能是内网的关键指标之一,而标准化和语义无关性在外网中举足轻重。所以,为何api网关无法工作在rpc上,因为它没有一个像http/https那样的通用标准。

二、czarrpc简介

czarrpc是作者基于dotnetty实现的rpc通讯框架,参考了surgingtars.net优秀设计,目前正在内部使用中,下面就czarrpc调用方式做一个简单介绍,测试结构如下:
【.NET Core项目实战-统一认证平台】第十六章 网关篇-Ocelot集成RPC服务

1、服务接口

新建一个czar.rpc.common类库,首先需要引用czar.rpcnuget包。

install-package czar.rpc

然后定义测试接口ihellorpc.cs,也是目前支持的调用方式。

using czar.rpc.attributes;
using czar.rpc.exceptions;
using czar.rpc.metadata;
using system;
using system.collections.generic;
using system.threading.tasks;

namespace czar.rpc.common
{
    /// <summary>
    /// 测试rpc实体
    /// </summary>
    [businessexceptioninterceptor]
    [czarrpc("demo.rpc.hello")]
    public interface ihellorpc: irpcbaseservice
    {
        string hello(int no, string name);

        void helloholder(int no, out string name);

        task<string> hellotask(int no, string name);

        valuetask<string> hellovaluetask(int no, string name);

        [czaroneway]
        void hellooneway(int no, string name);

        task testbusinessexceptioninterceptor();

        demomodel hellomodel(int d1, string d2, datetime d3);

        task<demomodel> hellomodelasync(int d1, string d2, datetime d3);

        demomodel hellosendmodel(demomodel model);

        demomodel hellosendmodelparm(string name,demomodel model);

        list<demomodel> hellosendmodellist(list<demomodel> model);
    }
    public class demomodel
    {
        /// <summary>
        /// 测试1
        /// </summary>
        public int t1 { get; set; }

        /// <summary>
        /// 测试2
        /// </summary>
        public string t2 { get; set; }

        /// <summary>
        /// 测试3
        /// </summary>
        public datetime t3 { get; set; }

        public childmodel child { get; set; }
    }

    public class childmodel
    {
        public string c1 { get; set; }
    }
}

2.服务端

新建一个控制台程序czar.rpc.server,然后实现服务接口,因为都是测试数据,所以就随意实现了方法。

hellorpcserver.cs

using czar.rpc.exceptions;
using system;
using system.collections.generic;
using system.threading.tasks;
using system.linq;
using system.net;
using czar.rpc.common;

namespace demo.rpc.server
{
    public class hellorpcserver: ihellorpc
    {
        public endpoint czarendpoint { get; set; }

        public string hello(int no, string name)
        {
            string result = $"{no}: hi, {name}";
            console.writeline(result);
            return result + " callback";
        }

        public void helloholder(int no, out string name)
        {
            name = no.tostring() + " out";
        }

        public void hellooneway(int no, string name)
        {
            /*
                耗时操作
            */
            console.writeline($"from oneway - {no}: hi, {name}");
        }

        public task<string> hellotask(int no, string name)
        {
            return task.fromresult(hello(no, name));
        }

        public valuetask<string> hellovaluetask(int no, string name)
        {
            return new valuetask<string>(hello(no, name));
        }

        public task testbusinessexceptioninterceptor()
        {
            throw new businessexception()
            {
                czarcode = "1",
                czarmessage = "test"
            };
        }

        public demomodel hellomodel(int d1, string d2, datetime d3)
        {
            return new demomodel()
            {
                t1 = d1 + 1,
                t2 = d2 + "2",
                t3 = d3.adddays(1)
            };
        }

        public async task<demomodel> hellomodelasync(int d1, string d2, datetime d3)
        {
            return await task.fromresult(
               new demomodel()
               {
                   t1 = d1 + 1,
                   t2 = d2 + "77777",
                   t3 = d3.adddays(1)
               }
                );
        }

        public demomodel hellosendmodel(demomodel model)
        {
            model.t1 = model.t1 + 10;
            model.t2 = model.t2 + "11";
            model.t3 = model.t3.adddays(12);
            return model;
        }

        public demomodel hellosendmodelparm(string name, demomodel model)
        {
            model.t1 = model.t1 + 10;
            model.t2 = model.t2 + "11";
            model.t3 = model.t3.adddays(12);
            if (model.child != null)
            {
                model.child.c1 = name+"说:"+ model.child.c1;
            }
            return model;
        }

        public list<demomodel> hellosendmodellist(list<demomodel> model)
        {
            return model.select(t => new demomodel() { t1=t.t1+10,t2=t.t2+"13",t3=t.t3.addyears(1),child=t.child }).tolist();
        }
    }
}

然后启动服务端监听。

class program
    {
        static void main(string[] args)
        {
            var host = new hostbuilder()
                .configurehostconfiguration(i => i.addjsonfile("czarconfig.json"))
                .configurelogging((hostcontext, configlogging) =>
                {
                    configlogging.addconsole();
                })
                .usecodec<jsoncodec>()
                .uselibuvtcphost()
                .useproxy()
                .useconsolelifetime()
                .build();

            host.runasync().wait();
        }
    }

启用外部使用czarconfig.json的配置文件,注意需要设置成始终复制。

{
  "czarhost": {
    "port": 7711, //监听端口
    "quietperiodseconds": 2,  //退出静默时间   dotnetty特性
    "shutdowntimeoutseconds": 2, //关闭超时时间 dotnetty特性
    "isssl": "false",  //是否启用 ssl, 客户端需要保持一致
    "pfxpath": "cert/datasync.pfx", //证书
    "pfxpassword": "123456"  //证书密钥 
  }
}

到此服务器端搭载完成。

3、客户端

新建客户端控制台程序czar.rpc.client,然后配置rpc调用信息。

{
  "czarhost": {
    "proxyendpoint": true, //是否启用动态服务地址,就是指定服务端ip
    "isssl": "false", //是否启用ssl
    "pfxpath": "cert/datasync.pfx", //证书
    "pfxpassword": "123456", //证书密钥
    "clientconfig": {   //客户端配置
      "demo.rpc.hello": {   //对应服务[czarrpc("demo.rpc.hello")] 值
        "host": "127.0.0.1", //服务端ip 如果proxyendpoint=false 时使用
        "port": 7711, //服务端端口 如果proxyendpoint=false 时使用
        "timeout": 10, //调用超时时间
        "writeridletimeseconds";30  //空闲超时时间,默认为30秒,非内网环境建议设置成5分钟内。
      }
    }
  }
}

现在开始启用客户端信息。

class program
    {
        public static iserviceprovider service;
        public static iconfiguration config;
        static async task main(string[] args)
        {
            try
            {
                var builder = new configurationbuilder();
                config = builder.addjsonfile("czarconfig.json").build();
                
              service = new servicecollection()
                    .addsingleton(config)
                    .addlogging(j => j.addconsole())
                    .addlibuvtcpclient(config)
                    .addproxy()
                    .builddynamicproxyserviceprovider();

                var rpc = service.getrequiredservice<ihellorpc>();
                //使用的内部指定的服务器地址
                rpc.czarendpoint = new ipendpoint(ipaddress.parse("127.0.0.1"), 7711);
                var result = string.empty;
                
                string t = "基本调用";
                result = rpc.hello(18, t);
                console.writeline(result);

                result = "无返回结果";
                rpc.helloholder(1, out result);
                console.writeline(result);
                result = await rpc.hellotask(2, "异步任务");
                console.writeline(result);
                result = "单向";
                rpc.hellooneway(3, "单向调用");
                console.writeline(result);
                result = await rpc.hellovaluetask(4, "valuetask任务");
                console.writeline(result);

                var modelresult = rpc.hellomodel(5, "返回实体", datetime.now);
                console.writeline($"{modelresult.t1} {modelresult.t2} {modelresult.t3.tolongdatestring()}");


                var modelresult1 = await rpc.hellomodelasync(6, "返回task实体", datetime.now);
                console.writeline($"{modelresult1.t1} {modelresult1.t2} {modelresult1.t3.tolongdatestring()}");

                var mm = new demomodel()
                {
                    t1 = 7,
                    t2 = "传实体返回实体",
                    t3 = datetime.now,
                    child = new childmodel()
                    {
                        c1 = "子类1"
                    }
                };
                var model2 = rpc.hellosendmodel(mm);
                console.writeline($"{model2.t1} {model2.t2} {model2.t3.tolongdatestring()}  {model2.child.c1}");

                var list = new list<demomodel>();
                var mm1 = new demomodel()
                {
                    t1 = 8,
                    t2 = "传list返回list",
                    t3 = datetime.now,
                    child = new childmodel()
                    {
                        c1 = "子类2"
                    }
                };
                var mm3 = new demomodel()
                {
                    t1 = 9,
                    t2 = "传list返回list",
                    t3 = datetime.now,
                    child = new childmodel()
                    {
                        c1 = "子类3"
                    }
                };
                list.add(mm1);
                list.add(mm3);
                var list3 = rpc.hellosendmodellist(list);
                console.writeline($"{list3[0].t1} {list3[0].t2} {list3[0].t3.tolongdatestring()} {list3[0].child?.c1}");


                var mm4 = new demomodel()
                {
                    t1 = 9,
                    t2 = "hellosendmodelparm",
                    t3 = datetime.now,
                    child = new childmodel()
                    {
                        c1 = "子类4"
                    }
                };
                var dd = rpc.hellosendmodelparm("hellosendmodelparm", mm4);
                console.writeline($"{dd.t1} {dd.t2} {dd.t3.tolongdatestring()}  {dd.child.c1}");

                //异常调用
                await rpc.testbusinessexceptioninterceptor();
            }
            catch (businessexception e)
            {
                console.writeline($"czarcode:{e.czarcode} czarmessage:{e.czarmessage}");
            }
            catch (exception ex)
            {
                console.writeline(ex);
            }
            console.readline();
        }
    }

现在整个rpc调用搭建完毕,然后分别启动服务器端和客户端,就可以看到屏幕输出内容如下。

客户端输出:
【.NET Core项目实战-统一认证平台】第十六章 网关篇-Ocelot集成RPC服务

服务器端输出:
【.NET Core项目实战-统一认证平台】第十六章 网关篇-Ocelot集成RPC服务

至此整个czarrpc的基本使用已经介绍完毕,感兴趣的朋友可以自行测试。

三、ocelot增加rpc支持

有了czarrpc的通讯框架后,现在在ocelot上实现rpc功能简直易如反掌,现在开始添加我们的rpc中间件,也让我们扩展的网关灵活起来。

还记得我介绍网关篇时添加中间件的步骤吗?如果不记得的可以先回去回顾下。

首先如何让网关知道这个后端调用是http还是rpc呢?这时应该会想到ocelot路由配置里的downstreamscheme,可以在这里判断我们定义的是http还是rpc即可。同时我们希望之前定义的所有中间件都生效,最后一步请求时如果配置下端路由rpc,使用rpc调用,否则使用http调用,这样可以重复利用之前所有的中间件功能,减少重复开发。

在之前的开发的自定义限流和自定义授权中间件开发中,我们知道开发完的中间件放到哪里使用,这里就不介绍原理了,直接添加到buildczarocelotpipeline里如下代码。

public static ocelotrequestdelegate buildczarocelotpipeline(this iocelotpipelinebuilder builder,
            ocelotpipelineconfiguration pipelineconfiguration)
        {
           
            // 注册一个全局异常
            builder.useexceptionhandlermiddleware();

            // 如果请求是websocket使用单独的管道
            builder.mapwhen(context => context.httpcontext.websockets.iswebsocketrequest,
                app =>
                {
                    app.usedownstreamroutefindermiddleware();
                    app.usedownstreamrequestinitialiser();
                    app.useloadbalancingmiddleware();
                    app.usedownstreamurlcreatormiddleware();
                    app.usewebsocketsproxymiddleware();
                });

            // 添加自定义的错误管道
            builder.useifnotnull(pipelineconfiguration.preerrorrespondermiddleware);

            //使用自定义的输出管道
            builder.useczarrespondermiddleware();

            // 下游路由匹配管道
            builder.usedownstreamroutefindermiddleware();

            //增加自定义扩展管道
            if (pipelineconfiguration.mapwhenocelotpipeline != null)
            {
                foreach (var pipeline in pipelineconfiguration.mapwhenocelotpipeline)
                {
                    builder.mapwhen(pipeline);
                }
            }

            // 使用http头部转换管道
            builder.usehttpheaderstransformationmiddleware();

            // 初始化下游请求管道
            builder.usedownstreamrequestinitialiser();

            // 使用自定义限流管道
            builder.useratelimiting();

            //使用请求id生成管道
            builder.userequestidmiddleware();

            //使用自定义授权前管道
            builder.useifnotnull(pipelineconfiguration.preauthenticationmiddleware);

            //根据请求判断是否启用授权来使用管道
            if (pipelineconfiguration.authenticationmiddleware == null)
            {
                builder.useauthenticationmiddleware();
            }
            else
            {
                builder.use(pipelineconfiguration.authenticationmiddleware);
            }

            //添加自定义限流中间件 2018-11-18 金焰的世界
            builder.useczarclientratelimitmiddleware();

            //添加自定义授权中间件  2018-11-15 金焰的世界
            builder.useahphauthenticationmiddleware();

            //启用自定义的认证之前中间件
            builder.useifnotnull(pipelineconfiguration.preauthorisationmiddleware);

            //是否使用自定义的认证中间件
            if (pipelineconfiguration.authorisationmiddleware == null)
            {
                builder.useauthorisationmiddleware();
            }
            else
            {
                builder.use(pipelineconfiguration.authorisationmiddleware);
            }

            // 使用自定义的参数构建中间件
            builder.useifnotnull(pipelineconfiguration.prequerystringbuildermiddleware);

            // 使用负载均衡中间件
            builder.useloadbalancingmiddleware();

            // 使用下游地址创建中间件
            builder.usedownstreamurlcreatormiddleware();

            // 使用缓存中间件
            builder.useoutputcachemiddleware();

            //判断下游的是否启用rpc通信,切换到rpc处理
            builder.mapwhen(context => context.downstreamreroute.downstreamscheme.equals("rpc", stringcomparison.ordinalignorecase), app =>
            {
                app.useczarrpcmiddleware();
            });

            //使用下游请求中间件
            builder.useczahttprequestermiddleware();

            return builder.build();
        }

这里是在最后请求前判断使用的下游请求方式,如果downstreamscheme使用的rpc,就使用rpc中间件处理。

rpc处理的完整逻辑是,如何从http请求中获取想要解析的参数,这里需要设置匹配的优先级,目前设计的优先级为。

1、首先提取路由参数,如果匹配上就是用路由参数名称为key,值为value,按顺序组成第一批参数。

2、提取query参数,如有有值按顺序组成第二批参数。

3、如果非get请求,提取body内容,如果非空,组成第三批参数

4、从配置库里提取rpc路由调用的服务名称和函数名称,以及是否单向调用。

5、按照获取的数据进行rpc调用并等待返回。

看了上面的设计是不是思路很清晰了呢?

1、rpc路由表设计

create table ahphrerouterpcconfig
(
    rpcid int identity(1,1) not null,
    rerouteid int,                      //路由表主键
    servantname varchar(100) not null,  //调用的服务名称
    funcname varchar(100) not null,     //调用的方法名称
    isoneway bit not null               //是否单向调用
)

2、提取远程调用方法

根据上游路由获取远程调用的配置项目

public interface irpcrepository
{
    /// <summary>
    /// 根据模板地址获取rpc请求方法
    /// </summary>
    /// <param name="upurl">上游模板</param>
    /// <returns></returns>
    task<remoteinvokemessage> getremotemethodasync(string upurl);
}

public class sqlserverrpcrepository : irpcrepository
    {
        private readonly czarocelotconfiguration _option;
        public sqlserverrpcrepository(czarocelotconfiguration option)
        {
            _option = option;
        }

        /// <summary>
        /// 获取rpc调用方法
        /// </summary>
        /// <param name="upurl"></param>
        /// <returns></returns>
        public async task<remoteinvokemessage> getremotemethodasync(string upurl)
        {
            using (var connection = new sqlconnection(_option.dbconnectionstrings))
            {
                string sql = @"select t4.* from ahphglobalconfiguration t1 inner join ahphconfigreroutes t2 on
t1.ahphid=t2.ahphid inner join ahphreroute t3 on t2.rerouteid=t3.rerouteid 
inner join ahphrerouterpcconfig t4 on t3.rerouteid=t4.rerouteid
where isdefault=1 and t1.infostatus=1 and t3.infostatus=1 and upstreampathtemplate=@url";
                var result = await connection.queryfirstordefaultasync<remoteinvokemessage>(sql, new { url = upurl });
                return result;
            }
        }
    }

3、重写返回结果

由于rpc调用后是返回的json封装的信息,需要解析成对应的httpcontent。

using system.io;
using system.net;
using system.net.http;
using system.threading.tasks;

namespace czar.gateway.rpc
{
    public class rpchttpcontent : httpcontent
    {
        private string result;

        public rpchttpcontent(string result)
        {
            this.result = result;
        }

        public rpchttpcontent(object result)
        {
            this.result = newtonsoft.json.jsonconvert.serializeobject(result);
        }

        protected override async task serializetostreamasync(stream stream, transportcontext context)
        {
            var writer = new streamwriter(stream);
            await writer.writeasync(result);
            await writer.flushasync();
        }

        protected override bool trycomputelength(out long length)
        {
            length = result.length;
            return true;
        }
    }
}

4、rpc中间件逻辑处理

有了前面的准备信息,现在基本可以完成逻辑代码的开发了,详细的中间件代码如下。

using czar.gateway.errors;
using czar.rpc.clients;
using ocelot.logging;
using ocelot.middleware;
using ocelot.responses;
using system.collections.generic;
using system.net;
using system.threading.tasks;

namespace czar.gateway.rpc.middleware
{
    public class czarrpcmiddleware : ocelotmiddleware
    {
        private readonly ocelotrequestdelegate _next;
        private readonly irpcclientfactory _clientfactory;
        private readonly iczarrpcprocessor _czarrpcprocessor;
        public czarrpcmiddleware(ocelotrequestdelegate next, irpcclientfactory clientfactory,
            iocelotloggerfactory loggerfactory, iczarrpcprocessor czarrpcprocessor) : base(loggerfactory.createlogger<czarrpcmiddleware>())
        {
            _next = next;
            _clientfactory = clientfactory;
            _czarrpcprocessor = czarrpcprocessor;
        }

        public async task invoke(downstreamcontext context)
        {
            var httpstatuscode = httpstatuscode.ok;
            var _param = new list<object>();
            //1、提取路由参数
            var tmpinfo = context.templateplaceholdernameandvalues;
            if (tmpinfo != null && tmpinfo.count > 0)
            {
                foreach (var tmp in tmpinfo)
                {
                    _param.add(tmp.value);
                }
            }
            //2、提取query参数
            foreach (var _q in context.httpcontext.request.query)
            {
                _param.add(_q.value.tostring());
            }
            //3、从body里提取内容
            if (context.httpcontext.request.method.toupper() != "get")
            {
                context.downstreamrequest.scheme = "http";
                var requert = context.downstreamrequest.tohttprequestmessage();
                if (requert.content!=null)
                {
                    var json = "{}";
                    json = await requert.content.readasstringasync();
                    _param.add(json);
                }
            }
            //从缓存里提取
            var req = await _czarrpcprocessor.getremotemethodasync(context.downstreamreroute.upstreampathtemplate.originalvalue);
            if (req != null)
            {
                req.parameters = _param.toarray();
                var result = await _clientfactory.sendasync(req, getendpoint(context.downstreamrequest.host, context.downstreamrequest.port));
                okresponse<rpchttpcontent> httpresponse;
                if (result.czarcode == czar.rpc.utilitys.rpcstatuscode.success)
                {
                    httpresponse = new okresponse<rpchttpcontent>(new rpchttpcontent(result.czarresult?.tostring()));
                }
                else
                {
                    httpresponse = new okresponse<rpchttpcontent>(new rpchttpcontent(result));
                }
                context.httpcontext.response.contenttype = "application/json";
                context.downstreamresponse = new downstreamresponse(httpresponse.data, httpstatuscode, httpresponse.data.headers, "ok");
            }
            else
            {//输出错误
                var error = new internalservererror($"请求路由 {context.httpcontext.request.path}未配置后端转发");
                logger.logwarning($"{error}");
                setpipelineerror(context, error);
            }
        }
        private endpoint getendpoint(string ipaddress, int port)
        {
            if (ipaddress.tryparse(ipaddress, out ipaddress ip))
            {
                return new ipendpoint(ip, port);
            }
            else
            {
                return new dnsendpoint(ipaddress, port);
            }
        }
    }
}

5、启动rpc客户端配置

目前rpc的客户端配置我们还没启动,只需要在addczarocelot中添加相关注入即可。

var service = builder.first(x => x.servicetype == typeof(iconfiguration));
var configuration = (iconfiguration)service.implementationinstance;
//rpc应用
builder.addsingleton<iczarrpcprocessor, czarrpcprocessor>();
builder.addsingleton<irpcrepository, sqlserverrpcrepository>();
builder.addlibuvtcpclient(configuration);

6、配置客户端

最后别忘了配置rpc客户端信息是否启用证书信息,为了配置信息的内容。

{
  "czarhost": {
    "proxyendpoint": true,
    "isssl": "false",
    "pfxpath": "cert/datasync.pfx",
    "pfxpassword": "bl123456",
    "clientconfig": {
      "demo.rpc.hello": {
        "host": "127.0.0.1",
        "port": 7711,
        "timeout": 20
      }
    }
  }
}

现在让网关集成rpc功能全部配置完毕。

四、网关rpc功能测试

本次测试我在原有的网关基础上,增加不同类型的rpc调用,就按照不同维度测试rpc调用功能,本次测试案例是建立在czar.rpc 服务端基础上,正好可以测试。
【.NET Core项目实战-统一认证平台】第十六章 网关篇-Ocelot集成RPC服务

【.NET Core项目实战-统一认证平台】第十六章 网关篇-Ocelot集成RPC服务

1、测试路由参数

请求路径/hello/{no}/{name},调用的服务端方法hello,传入的两个参数分别是no ,name

可以在服务器端添加断点调试,发现确实接收到请求信息,并正常返回,下面是postman测试结果。
【.NET Core项目实战-统一认证平台】第十六章 网关篇-Ocelot集成RPC服务

2、使用query方式传递参数

请求路径/rpc/query,调用的服务端方法还是hello,参数分别是no ,name
【.NET Core项目实战-统一认证平台】第十六章 网关篇-Ocelot集成RPC服务

3、使用post方式传递json

请求路径/rpc/body,调用的服务器方法是hellosendmodel
【.NET Core项目实战-统一认证平台】第十六章 网关篇-Ocelot集成RPC服务

4、混合参数使用

请求的路径/rpc/bodyparm/{name},调用的服务器端方法是hellosendmodelparm
【.NET Core项目实战-统一认证平台】第十六章 网关篇-Ocelot集成RPC服务

所有的返回结果可自行调试测试,发现都能达到预期结果。

同时此网关还是支持默认的http请求的,这里就不一一测试了。

五、总结

本篇我介绍了什么是rpc,以及czar.rpc的基本使用,然后使用czar.rpc框架集成到我们基于ocelot扩展网关中,并实现了不能方式的rpc调用,可以在几乎不改变现有流程的情况下很快速的集成进去,这也是ocelot开发框架的魅力所在。

如果在使用过程中有什么问题或建议,可以在.net core项目实战交流群(637326624)中联系作者。

最后本文涉及的所有的源代码可在中下载预览。