欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

动手造*:实现一个简单的 EventBus

程序员文章站 2022-12-09 23:54:00
动手造*:实现一个简单的 EventBus Intro EventBus 是一种事件发布订阅模式,通过 EventBus 我们可以很方便的实现解耦,将事件的发起和事件的处理的很好的分隔开来,很好的实现解耦。 微软官方的示例项目 "EShopOnContainers" 也有在使用 EventBus ......

动手造*:实现一个简单的 eventbus

intro

eventbus 是一种事件发布订阅模式,通过 eventbus 我们可以很方便的实现解耦,将事件的发起和事件的处理的很好的分隔开来,很好的实现解耦。 微软官方的示例项目 eshoponcontainers 也有在使用 eventbus 。

这里的 eventbus 实现也是参考借鉴了微软 eshoponcontainers 项目。

eventbus 处理流程:

动手造*:实现一个简单的 EventBus

微服务间使用 eventbus 实现系统间解耦:

动手造*:实现一个简单的 EventBus

借助 eventbus 我们可以很好的实现组件之间,服务之间,系统之间的解耦以及相互通信的问题。

起初觉得 eventbus 和 mq 其实差不多嘛,都是通过异步处理来实现解耦合,高性能。后来看到了下面这张图才算明白为什么要用 eventbus 以及 eventbus 和 mq 之间的关系,eventbus 是抽象的,可以用mq来实现 eventbus.

动手造*:实现一个简单的 EventBus

为什么要使用 eventbus

  1. 解耦合(轻松的实现系统间解耦)
  2. 高性能可扩展(每一个事件都是简单独立且不可更改的对象,只需要保存新增的事件,不涉及其他的变更删除操作)
  3. 系统审计(每一个事件都是不可变更的,每一个事件都是可追溯的)
  4. ...

eventbus 整体架构:

  • ieventbase :所有的事件应该实现这个接口,这个接口定义了事件的唯一id eventid 和事件发生的事件 eventat

动手造*:实现一个简单的 EventBus

  • ieventhandler:定义了一个 handle 方法来处理相应的事件

动手造*:实现一个简单的 EventBus

  • ieventstore:所有的事件的处理存储,保存事件的ieventhandler,一般不会直接操作,通过 eventbus 的订阅和取消订阅来操作 eventstore

动手造*:实现一个简单的 EventBus

  • ieventbus:用来发布/订阅/取消订阅事件,并将事件的某一个 ieventhandler 保存到 eventstore 或从 eventstore 中移除

动手造*:实现一个简单的 EventBus

使用示例

来看一个使用示例,完整代码示例

internal class eventtest
{
    public static void maintest()
    {
        var eventbus = dependencyresolver.current.resolveservice<ieventbus>();
        eventbus.subscribe<counterevent, countereventhandler1>();
        eventbus.subscribe<counterevent, countereventhandler2>();
        eventbus.subscribe<counterevent, delegateeventhandler<counterevent>>();
        eventbus.publish(new counterevent { counter = 1 });

        eventbus.unsubscribe<counterevent, countereventhandler1>();
        eventbus.unsubscribe<counterevent, delegateeventhandler<counterevent>>();
        eventbus.publish(new counterevent { counter = 2 });
    }
}

internal class counterevent : eventbase
{
    public int counter { get; set; }
}

internal class countereventhandler1 : ieventhandler<counterevent>
{
    public task handle(counterevent @event)
    {
        loghelper.getlogger<countereventhandler1>().info($"event info: {@event.tojson()}, handler type:{gettype().fullname}");
        return task.completedtask;
    }
}

internal class countereventhandler2 : ieventhandler<counterevent>
{
    public task handle(counterevent @event)
    {
        loghelper.getlogger<countereventhandler2>().info($"event info: {@event.tojson()}, handler type:{gettype().fullname}");
        return task.completedtask;
    }
}

具体实现

eventstoreinmemory 实现:

eventstoreinmemory 是 ieventstore 将数据放在内存中的实现,使用了 concurrentdictionary 以及 hashset 来尽可能的保证高效,具体实现代码如下:

public class eventstoreinmemory : ieventstore
{
    private readonly concurrentdictionary<string, hashset<type>> _eventhandlers = new concurrentdictionary<string, hashset<type>>();

    public bool addsubscription<tevent, teventhandler>()
        where tevent : ieventbase
        where teventhandler : ieventhandler<tevent>
    {
        var eventkey = geteventkey<tevent>();
        if (_eventhandlers.containskey(eventkey))
        {
            return _eventhandlers[eventkey].add(typeof(teventhandler));
        }
        else
        {
            return _eventhandlers.tryadd(eventkey, new hashset<type>()
            {
                typeof(teventhandler)
            });
        }
    }

    public bool clear()
    {
        _eventhandlers.clear();
        return true;
    }

    public icollection<type> geteventhandlertypes<tevent>() where tevent : ieventbase
    {
        if(_eventhandlers.count == 0)
            return  new type[0];
        var eventkey = geteventkey<tevent>();
        if (_eventhandlers.trygetvalue(eventkey, out var handlers))
        {
            return handlers;
        }
        return new type[0];
    }

    public string geteventkey<tevent>()
    {
        return typeof(tevent).fullname;
    }

    public bool hassubscriptionsforevent<tevent>() where tevent : ieventbase
    {
        if(_eventhandlers.count == 0)
            return false;

        var eventkey = geteventkey<tevent>();
        return _eventhandlers.containskey(eventkey);
    }

    public bool removesubscription<tevent, teventhandler>()
        where tevent : ieventbase
        where teventhandler : ieventhandler<tevent>
    {
        if(_eventhandlers.count == 0)
            return false;

        var eventkey = geteventkey<tevent>();
        if (_eventhandlers.containskey(eventkey))
        {
            return _eventhandlers[eventkey].remove(typeof(teventhandler));
        }
        return false;
    }
}

eventbus 的实现,从上面可以看到 eventstore 保存的是 ieventhandler 对应的 type,在 publish 的时候根据 type 从 ioc 容器中取得相应的 handler 即可,如果没有在 ioc 容器中找到对应的类型,则会尝试创建一个类型实例,然后调用 ieventhandlerhandle 方法,代码如下:

/// <summary>
/// eventbus in process
/// </summary>
public class eventbus : ieventbus
{
    private static readonly iloghelperlogger logger = helpers.loghelper.getlogger<eventbus>();

    private readonly ieventstore _eventstore;
    private readonly iserviceprovider _serviceprovider;

    public eventbus(ieventstore eventstore, iserviceprovider serviceprovider = null)
    {
        _eventstore = eventstore;
        _serviceprovider = serviceprovider ?? dependencyresolver.current;
    }

    public bool publish<tevent>(tevent @event) where tevent : ieventbase
    {
        if (!_eventstore.hassubscriptionsforevent<tevent>())
        {
            return false;
        }
        var handlers = _eventstore.geteventhandlertypes<tevent>();
        if (handlers.count > 0)
        {
            var handlertasks = new list<task>();
            foreach (var handlertype in handlers)
            {
                try
                {
                    if (_serviceprovider.getserviceorcreateinstance(handlertype) is ieventhandler<tevent> handler)
                    {
                        handlertasks.add(handler.handle(@event));
                    }
                }
                catch (exception ex)
                {
                    logger.error(ex, $"handle event [{_eventstore.geteventkey<tevent>()}] error, eventhandlertype:{handlertype.fullname}");
                }
            }
            handlertasks.whenall().configureawait(false);

            return true;
        }
        return false;
    }

    public bool subscribe<tevent, teventhandler>()
        where tevent : ieventbase
        where teventhandler : ieventhandler<tevent>
    {
        return _eventstore.addsubscription<tevent, teventhandler>();
    }

    public bool unsubscribe<tevent, teventhandler>()
        where tevent : ieventbase
        where teventhandler : ieventhandler<tevent>
    {
        return _eventstore.removesubscription<tevent, teventhandler>();
    }
}

项目实例

来看一个实际的项目中的使用,在我的活动室预约项目中有一个公告的模块,访问公告详情页面,这个公告的访问次数加1,把这个访问次数加1改成了用 eventbus 来实现,实际项目代码:https://github.com/weihanli/activityreservation/blob/67e2cb8e92876629a7af6dc051745dd8c7e9faeb/activityreservation/startup.cs

  1. 定义 event 以及 eventhandler
public class noticeviewevent : eventbase
{
    public guid noticeid { get; set; }

    // userid
    // ip
    // ...
}

public class noticevieweventhandler : ieventhandler<noticeviewevent>
{
    public async task handle(noticeviewevent @event)
    {
        await dependencyresolver.current.tryinvokeserviceasync<reservationdbcontext>(async dbcontext =>
        {
            var notice = await dbcontext.notices.findasync(@event.noticeid);
            notice.noticevisitcount += 1;
            await dbcontext.savechangesasync();
        });
    }
}

这里的 event 只定义了一个 noticeid ,其实也可以把请求信息如ip/ua等信息加进去,在 eventhandler里处理以便日后数据分析。

  1. 注册 eventbus 相关服务以及 eventhandlers
services.addsingleton<ieventbus, eventbus>();
services.addsingleton<ieventstore, eventstoreinmemory>();
//register eventhandlers
services.addsingleton<noticevieweventhandler>();
  1. 订阅事件
public void configure(iapplicationbuilder app, ihostingenvironment env, iloggerfactory loggerfactory, ieventbus eventbus)
{
    eventbus.subscribe<noticeviewevent, noticevieweventhandler>(); 
    // ...
}
  1. 发布事件
eventbus.publish(new noticeviewevent { noticeid = notice.noticeid });

reference