欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

.NetCore利用BlockingCollection实现简易消息队列

程序员文章站 2023-11-05 22:14:04
消息队列现今的应用场景越来越大,常用的有rabbmitmq和kafka。 我们用blockingcollection来实现简单的消息队列。 blockingco...

消息队列现今的应用场景越来越大,常用的有rabbmitmq和kafka。

我们用blockingcollection来实现简单的消息队列。

blockingcollection实现了生产者/消费者模式,是对iproducerconsumercollection<t>接口的实现。与其他concurrent集合一样,每次add或take元素,都会导致对集合的lock。只有当确定需要在内存中创建一个生产者,消费者模式时,再考虑这个类。

msdn中的示例用法:

using (blockingcollection<int> bc = new blockingcollection<int>())
  {
    task.factory.startnew(() =>
    {
      for (int i = 0; i < 1000; i++)
      {
        bc.add(i);
        thread.sleep(50); 
      }
 
 
      // need to do this to keep foreach below from hanging
      bc.completeadding();
    });
 
 
    // now consume the blocking collection with foreach.
    // use bc.getconsumingenumerable() instead of just bc because the
    // former will block waiting for completion and the latter will
    // simply take a snapshot of the current state of the underlying collection.
    foreach (var item in bc.getconsumingenumerable())
    {
      console.writeline(item);
    }
  }

实现消息队列

用vs2017创建一个控制台应用程序。创建demoqueueblock类,封装一些常用判断。

  • hasele,判断是否有元素
  • add向队列中添加元素
  • take从队列中取出元素

为了不把blockingcollection直接暴漏给使用者,我们封装一个demoqueueblock类

  /// <summary>
  /// blockingcollection演示消息队列
  /// </summary>
  /// <typeparam name="t"></typeparam>
  public class demoqueueblock<t> where t : class
  {
    private static blockingcollection<t> colls;
    public demoqueueblock()
    {

    }
    public static bool iscomleted() {
      if (colls != null && colls.iscompleted) {
        return true;
      }
      return false;
    }
    public static bool hasele()
    {
      if (colls != null && colls.count>0)
      {
        return true;
      }
      return false;
    }
    
    public static bool add(t msg)
    {
      if (colls == null)
      {
        colls = new blockingcollection<t>();
      }
      colls.add(msg);
      return true;
    }
    public static t take()
    {
      if (colls == null)
      {
        colls = new blockingcollection<t>();
      }
      return colls.take();
    }
  }

  /// <summary>
  /// 消息体
  /// </summary>
  public class demomessage
  {
    public string businesstype { get; set; }
    public string businessid { get; set; }
    public string body { get; set; }
  }

添加元素进队列

通过控制台,添加元素

      //添加元素
      while (true)
      {
        console.writeline("请输入队列");
        var read = console.readline();
        if (read == "exit")
        {
          return;
        }

        demoqueueblock<demomessage>.add(new demomessage() { businessid = read });
      }

消费队列

通过判断iscomleted,来确定是否获取队列

 task.factory.startnew(() =>
      {
        //从队列中取元素。
        while (!demoqueueblock<demomessage>.iscomleted())
        {
          try
          {
            var m = demoqueueblock<demomessage>.take();
           console.writeline("已消费:" + m.businessid);
          }
          catch (exception ex)
          {
            console.writeline(ex.message);
          }
        }
      });

查看运行结果

.NetCore利用BlockingCollection实现简易消息队列

运行结果

这样我们就实现了简易的消息队列。

示例源码:简易队列

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。