欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

.net core 2.0 event bus 一个简单的基于内存事件总线实现

程序员文章站 2022-07-30 13:26:40
1.先定义一个事件接口 2.定义一个事件处理接口 3.定义一个发布接口 4.定义一个订阅接口 5.创建一个类用来存事件 6.实现发布类 7.实现订阅类 9.测试用例 10.测试结果 ......

1.先定义一个事件接口

public  interface  IEvent
    {
     
    }

2.定义一个事件处理接口

 public  interface  IEventHandler : IEvent
    {
       Task Handle(IEvent e);
    }

 

3.定义一个发布接口

public interface IEventPublisher
    {
        Task Publish<TEvent>(TEvent e) where TEvent : IEvent;
    }

4.定义一个订阅接口

public interface IEventSubscriber 
    {
        
        Task Subscribe<TEvent, EH>() where TEvent : IEvent where EH : class, IEventHandler, new();
    }

5.创建一个类用来存事件

 public static class MemoryMq
    {
        public static ConcurrentDictionary<string, IEvent> eventQueueDict { get; set; }

    }

6.实现发布类

 public class InMemoryEventPublisher : IEventPublisher
    {
        public Task Publish<TEvent>(TEvent @event) where TEvent : IEvent
        {
            if (@event == null) return Task.CompletedTask;
            if (MemoryMq.eventQueueDict == null)
            {
                MemoryMq.eventQueueDict = new ConcurrentDictionary<string, IEvent>();
            }
            MemoryMq.eventQueueDict.GetOrAdd(Guid.NewGuid().ToString(),@event);
            return Task.CompletedTask;
        }
    }

7.实现订阅类

public class InMemoryEventSubscriber: IEventSubscriber
    {
        private readonly ConcurrentDictionary<string, Task> taskDict = new ConcurrentDictionary<string, Task>();

   

        public Task Subscribe<TEvent, EH>() where TEvent : IEvent
           where EH : class, IEventHandler, new()
        {
            
            EH state = new EH();
           

            
            Task.Run(() =>
            {
                while (true)
                {
                    if (MemoryMq.eventQueueDict != null)
                    {
                        foreach (var a in MemoryMq.eventQueueDict)
                        {
                            state.Handle(a.Value as IEvent);
                            IEvent o;
                            MemoryMq.eventQueueDict.TryRemove(a.Key ,out o);
                        }
                    }

                }
                
            });
           
            return Task.CompletedTask;
        }
    }

 

9.测试用例

namespace MemoryMqTest
{
    public class EventHandler : IEventHandler
    {
        public Task Handle(IEvent e, MessagingHelper h)
        {
            switch (e)
            {

                case Order value: Console.WriteLine(value.name); break;
              
            }
            return Task.CompletedTask;

        }

    }
    public class Order : IEvent
    {
        public string name { get; set; }
    }
    class Program
    {
        static void Main(string[] args)
        {
            var servicecollection = new ServiceCollection();
            servicecollection.AddSingleton<IEventPublisher, InMemoryEventPublisher>();
            servicecollection.AddSingleton<IEventSubscriber, InMemoryEventSubscriber>();
            var provider = servicecollection.BuildServiceProvider();

           var eventPub = provider.GetService<IEventPublisher>();

            var _eventSub = provider.GetService<IEventSubscriber>();

            _eventSub.Subscribe<Order, EventHandler>();


            var order = new Order();
            order.name = "test";
            eventPub.Publish(order);

            Console.WriteLine("Hello World!");

            Console.ReadKey();
        }
    }
}

10.测试结果

.net core 2.0 event bus 一个简单的基于内存事件总线实现