欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

activeMQ学习笔记二(ActiveMQ安装及简单java访问)

程序员文章站 2022-07-15 08:59:31
...

1. ActiveMQ的安装(开源的MOM消息中间件)

  1. 从官网下载安装包,http://activemq.apache.org/download.html,解压安装 
    tar -zxvf apache-activemq-5.13.4-bin.tar.gz
  2. 赋予运行权限 chmod +x,windows可以忽略此步
  3. 配置vim /usr/lical/activemq-1/conf/activemq.xml
  4. 启动 cd /usr/local/activemq-1/bin ./activemq start

启动后,activeMQ会占用两个端口,一个是负责接收发送消息的tcp端口:61616,一个是基于web负责用户界面化管理的端口:8161。这两个端口可以在conf下面的xml中找到。http服务器使用了jettry。
这里有个问题是启动mq后,很长时间管理界面才可以显示出来。

2. 用Java访问ActiveMQ

先附上Bean代码:

public class MqBean implements Serializable{
    private Integer age;
    private String name;
    public Integer getAge() {
        return age;
    }
    public void setAge(Integer age) {
        this.age = age;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
}

2.1 队列消息的发送:

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    Connection connection;
    Session session;
    Destination destination;
    MessageProducer producer;
    connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616");
    try {
        connection = connectionFactory.createConnection();
        connection.start();
        //第一个参数是是否是事务型消息,设置为true,第二个参数无效
        //第二个参数是
        //Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。异常也会确认消息,应该是在执行之前确认的
        //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会删除消息。可以在失败的
        //时候不确认消息,不确认的话不会移出队列,一直存在,下次启动继续接受。接收消息的连接不断开,其他的消费者也不会接受(正常情况下队列模式不存在其他消费者)
        //DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。在需要考虑资源使用时,这种模式非常有效。
        //待测试
        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        destination = session.createQueue("test-queue");
        producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        //优先级不能影响先进先出。。。那这个用处究竟是什么呢呢呢呢
        MqBean bean = new MqBean();
        bean.setAge(13);
        for(int i=0;i<100;i++){
            bean.setName("小黄"+i);
            producer.send(session.createObjectMessage(bean));
        }
        producer.close();
        System.out.println("呵呵");
    } catch (JMSException e) {
        e.printStackTrace();
    }
}

注:在上面的代码中,确认模式有三种,里面的DUPS_OK_ACKNOWLEDGE和AUTO_ACKNOWLEDGE一直没明白有什么区别。因为无法测试。不过大概也明白了一些。其实主要是MQ处理消息的流程决定的:

  1. 消息从生成方客户端传送到消息服务器。
  2. 消息服务器读取消息。
  3. 消息被放置到持久性存储器当中(出于可靠性的考虑)。
  4. 消息服务器确认收到消息(出于可靠性的考虑)。
  5. 消息服务器确定消息的路由。
  6. 消息服务器写出消息。
  7. 消息从消息服务器传送到使用方客户端。
  8. 使用方客户端确认收到消息(出于可靠性的考虑)。
  9. 消息服务器处理客户端确认(出于可靠性的考虑)。
  10. 消息服务器确定已经处理客户端确认。

这些步骤是连续的,所以任何步骤都可能成为消息从生成方客户端到使用方客户端的传送过程的瓶颈。这些步骤中的大多数都取决于消息传送系统的物理特征:网络带宽、计算机处理速度和消息服务器体系结构等等。但是,有一些步骤还取决于消息传送应用程序的特征和该应用程序要求的可靠性级别。
其实就是基于可靠性还是性能的选择.

2.2 队列消息的接收:

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    // Connection :JMS 客户端到JMS Provider 的连接  
    Connection connection = null;
    // Session: 一个发送或接收消息的线程  
    Session session;
    // Destination :消息的目的地;消息发送给谁.  
    Destination destination;
    // 消费者,消息接收者  
    MessageConsumer consumer;
    connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616");
    try {
        // 构造从工厂得到连接对象  
        connection = connectionFactory.createConnection();
        // 启动  
        connection.start();
        // 获取操作连接  
        //这个最好还是有事务
        session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置  
        destination = session.createQueue("test-queue");
        consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    MqBean bean = (MqBean) ((ObjectMessage)message).getObject();
                    System.out.println(bean);
                    if (null != message) {
                        System.out.println("收到消息" + bean.getName());
                    }
                } catch (Exception e) {
                    // TODO: handle exception
                }
            }
        });
    } catch (Exception e) {
        e.printStackTrace();
    }
}

注:对于队列来说,比较简单的优化策略,应该就是队列分载了。由于每个消费者都是单线程的,所以可以设置多个消费者来提高速度。
大家可以复制个消费者自己测试下,在消费者中添加sleep测试下效果。

2.3 订阅消息的发送

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    Connection connection;
    Session session;
    Destination destination;
    MessageProducer producer;
    connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616");
    try {
        connection = connectionFactory.createConnection();
        connection.start();
        
        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        destination = session.createTopic("test-topic");
        producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        //优先级不能影响先进先出。。。那这个用处究竟是什么呢呢呢呢
        MqBean bean = new MqBean();
        bean.setAge(13);
        for(int i=0;i<100;i++){
            Thread.sleep(1000);
            bean.setName("小黄"+i);
            producer.send(session.createObjectMessage(bean));
        }
        producer.close();
        System.out.println("呵呵");
    } catch (Exception e) {
        e.printStackTrace();
    }
}

2.4 订阅消息的接收

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    // Connection :JMS 客户端到JMS Provider 的连接  
    Connection connection = null;
    // Session: 一个发送或接收消息的线程  
    Session session;
    // Destination :消息的目的地;消息发送给谁.  
    Destination destination;
    // 消费者,消息接收者  
    MessageConsumer consumer;
    connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616");
    try {
        // 构造从工厂得到连接对象  
        connection = connectionFactory.createConnection();
        // 启动  
        connection.start();
        // 获取操作连接  
        //这个最好还是有事务
        session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置  
        destination = session.createQueue("test-queue");
        consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    MqBean bean = (MqBean) ((ObjectMessage)message).getObject();
                    System.out.println(bean);
                    if (null != message) {
                        System.out.println("收到消息" + bean.getName());
                    }
                } catch (Exception e) {
                    // TODO: handle exception
                }
            }
        });
    } catch (Exception e) {
        e.printStackTrace();
    }
}

以上的消息发送后,如果没有接收到,可以登录自己的MQ管理页面:http://192.168.3.159:8161/admin/ ,默认帐号密码都是admin,查看队列中的消息

activeMQ学习笔记二(ActiveMQ安装及简单java访问)

Number Of Pending Messages 等待消费的消息 这个是当前未出队列的数量。可以理解为总接收数-总出队列数
Messages Enqueued 进入队列的消息 进入队列的总数量,包括出队列的。 这个数量只增不减

Messages Dequeued 出了队列的消息 可以理解为是消费这消费掉的数量

MOM

 

(分布式系统的集成)

 编辑
MOM是Message Oriented Middleware的英文缩写,指的是利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。