欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

ActiveMQ在C#中的应用示例分析

程序员文章站 2022-05-18 13:00:57
本文实例讲述了activemq在c#中的应用。分享给大家供大家参考,具体如下: activemq是个好东东,不必多说。activemq提供多种语言支持,如java, c,...

本文实例讲述了activemq在c#中的应用。分享给大家供大家参考,具体如下:

activemq是个好东东,不必多说。activemq提供多种语言支持,如java, c, c++, c#, ruby, perl, python, php等。由于我在windows下开发gui,比较关心c++和c#,其中c#的activemq很简单,apache提供nms(.net messaging service)支持.net开发,只需如下几个步骤即能建立简单的实现。c++的应用相对麻烦些,后面会有文章介绍。

1、去activemq官方网站下载最新版的activemq,网址:。我之前下的是5.3.1,5.3.2现在也已经出来了。

2、去activemq官方网站下载最新版的apache.nms,网址:,需要下载apache.nms和apache.nms.activemq两个bin包,如果对源码感兴趣,也可下载src包。这里要提醒一下,如果下载1.2.0版本的nms.activemq,apache.nms.activemq.dll在实际使用中有个bug,即停止activemq应用时会抛waitone函数异常,查看src包中的源码发现是由于apache.nms.activemq-1.2.0-src\src\main\csharp\transport\inactivitymonitor.cs中的如下代码造成的,修改一下源码重新编译即可。看了一下最新版1.3.0已经修复了这个bug,因此下载最新版即可。

private void stopmonitorthreads()
{
  lock(monitor)
  {
    if(monitorstarted.compareandset(true, false))
    {
      autoresetevent shutdownevent = new autoresetevent(false);
      // attempt to wait for the timers to shutdown, but don't wait
      // forever, if they don't shutdown after two seconds, just quit.
      this.readchecktimer.dispose(shutdownevent);
      shutdownevent.waitone(timespan.frommilliseconds(2000));
      this.writechecktimer.dispose(shutdownevent);
      shutdownevent.waitone(timespan.frommilliseconds(2000));
      //waitone的定义:public virtual bool waitone(timespan timeout,bool exitcontext)
      this.asynctasks.shutdown();
      this.asynctasks = null;
      this.asyncwritetask = null;
      this.asyncerrortask = null;
    }
  }
}

3、运行activemq,找到activemq解压后的bin文件夹:...\apache-activemq-5.3.1\bin,执行activemq.bat批处理文件即可启动activemq服务器,默认端口为61616,这可在配置文件中修改。

4、写c#程序实现activemq的简单应用。新建c#工程(一个producter项目和一个consumer项目),winform或console程序均可,这里建的是console工程,添加对apache.nms.dll和apache.nms.activemq.dll的引用,然后即可编写实现代码了,简单的producer和consumer实现代码如下:

producer:

using system;
using system.collections.generic;
using system.text;
using apache.nms;
using apache.nms.activemq;
using system.io;
using system.xml.serialization;
using system.runtime.serialization.formatters.binary;
namespace publish
{
  class program
  {
    static void main(string[] args)
    {
      try
      {
        //create the connection factory
        iconnectionfactory factory = new connectionfactory("tcp://localhost:61616/");
        using (iconnection connection = factory.createconnection())
        {
          //create the session
          using (isession session = connection.createsession())
          {
            //create the producer for the topic/queue
            imessageproducer prod = session.createproducer(
              new apache.nms.activemq.commands.activemqtopic("testing"));
            //send messages
            int i = 0;
            while (!console.keyavailable)
            {
              itextmessage msg = prod.createtextmessage();
              msg.text = i.tostring();
              console.writeline("sending: " + i.tostring());
              prod.send(msg, apache.nms.msgdeliverymode.nonpersistent, apache.nms.msgpriority.normal, timespan.minvalue);
              system.threading.thread.sleep(5000);
              i++;
            }
          }
        }
        console.readline();
      }
      catch (system.exception e)
      {
        console.writeline("{0}",e.message);
        console.readline();
      }
    }
  }
}

consumer:

using system;
using system.collections.generic;
using system.text;
using apache.nms;
using apache.nms.activemq;
using system.io;
using system.xml.serialization;
using system.runtime.serialization.formatters.binary;
namespace subscribe
{
  class program
  {
    static void main(string[] args)
    {
      try
      {
        //create the connection factory
        iconnectionfactory factory = new connectionfactory("tcp://localhost:61616/");
        //create the connection
        using (iconnection connection = factory.createconnection())
        {
          connection.clientid = "testing listener";
          connection.start();
          //create the session
          using (isession session = connection.createsession())
          {
            //create the consumer
            imessageconsumer consumer = session.createdurableconsumer(new apache.nms.activemq.commands.activemqtopic("testing"), "testing listener", null, false);
            consumer.listener += new messagelistener(consumer_listener);
            console.readline();
          }
          connection.stop();
          connection.close();
        }
      }
      catch (system.exception e)
      {
        console.writeline(e.message);
      }
    }
    static void consumer_listener(imessage message)
    {
      try
      {
        itextmessage msg = (itextmessage)message;
        console.writeline("receive: " + msg.text);
      }
      catch (system.exception e)
      {
        console.writeline(e.message);
      }
    }
  }
}

程序实现的功能:生产者producer建立名为testing的主题,并每隔5秒向该主题发送消息,消费者consumer订阅了testing主题,因此只要生产者发送testing主题的消息到activemq服务器,服务器就将该消息发送给订阅了testing主题的消费者。

编译生成producer.exe和consumer.exe,并执行两个exe,即可看到消息的发送与接收了。

这个例子是建的主题(topic),activemq还支持另一种方式:queue,即p2p,两者有什么区别呢?区别在于,topic是广播,即如果某个topic被多个消费者订阅,那么只要有消息到达服务器,服务器就将该消息发给全部的消费者;而queue是点到点,即一个消息只能发给一个消费者,如果某个queue被多个消费者订阅,没有特殊情况的话消息会一个一个地轮流发给不同的消费者,比如:

msg1-->consumer a

msg2-->consumer b

msg3-->consumer c

msg4-->consumer a

msg5-->consumer b

msg6-->consumer c

特殊情况是指:activemq支持过滤机制,即生产者可以设置消息的属性(properties),该属性与消费者端的selector对应,只有消费者设置的selector与消息的properties匹配,消息才会发给该消费者。topic和queue都支持selector。

properties和selector该如何设置呢?请看如下代码:

producer:

itextmessage msg = prod.createtextmessage();
msg.text = i.tostring();
msg.properties.setstring("myfilter", "test1");
console.writeline("sending: " + i.tostring());
prod.send(msg, apache.nms.msgdeliverymode.nonpersistent, apache.nms.msgpriority.normal, timespan.minvalue);

consumer:

//生成consumer时通过参数设置selector
imessageconsumer consumer = session.createconsumer(new apache.nms.activemq.commands.activemqqueue("testing"), "myfilter='test1'");

更多关于c#相关内容感兴趣的读者可查看本站专题:《c#窗体操作技巧汇总》、《c#常见控件用法教程》、《winform控件用法总结》、《c#程序设计之线程使用技巧总结》、《c#操作excel技巧总结》、《c#中xml文件操作技巧汇总》、《c#数据结构与算法教程》、《c#数组操作技巧总结》及《c#面向对象程序设计入门教程

希望本文所述对大家c#程序设计有所帮助。