ActiveMQ学习笔记(一)
程序员文章站
2022-07-15 09:50:13
...
能干嘛?
解耦:系统间不直接调用,代码改动少,如A发送json给B,A发送xml给C,在接入D系统的时候又要发送其他格式的数据
削峰:控制并发流量,不直接请求我,当我处理完了才会去中间人取下一个请求
异步:不需要立即响应,减少等待时间
发送消息
package cn.fg.jsm.producer;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
//消息生产者
public class Producer {
public static void main(String[] args) throws JMSException {
//1.创建连接工厂,brokerURL(可以理解为实例):ActiveMQ实例的地址
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.1.3:61616");
//2.通过工厂创建连接
Connection connection = factory.createConnection();
connection.start(); //开始连接
//3.创建回话,两个参数:transacted 事务,acknowledgeMode 签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(队列或主题)
//Destination destination = session.createQueue("queue01"); //Destination 是接口 Queue、Topic 继承Destination
Queue queue = session.createQueue("queue01"); //创建队列
//Topic topic = session.createTopic("topic01"); //创建主题
//5.创建消息生产者,参数:destination 目的地
MessageProducer messageProducer = session.createProducer(queue);
//6.创建消息
TextMessage textMessage = session.createTextMessage("这是一个消息" + System.currentTimeMillis());
//7.发送消息
messageProducer.send(textMessage);
//8.释放资源
messageProducer.close();
session.close();
connection.close();
System.out.println("消息发送完毕");
}
}
访问管理控制台
接收消息(阻塞)
package cn.fg.jsm.producer;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
//消息消费者
public class Consumer {
public static void main(String[] args) throws JMSException {
//1.创建连接工厂,brokerURL(可以理解为实例):ActiveMQ实例的地址
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.1.3:61616");
//2.通过工厂创建连接
Connection connection = factory.createConnection();
connection.start(); //开始连接
//3.创建回话,两个参数:transacted 事务,acknowledgeMode 签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(队列或主题)
//Destination destination = session.createQueue("queue01"); //Destination 是接口 Queue、Topic 继承Destination
Queue queue = session.createQueue("queue01"); //创建队列
//Topic topic = session.createTopic("topic01"); //创建主题
//5.创建消费者,参数:destination 目的地
MessageConsumer messageConsumer = session.createConsumer(queue);
//6.接收消息,同步阻塞
while (true) {
//接收消息,返回Message接口,这里使用TextMessage,因为发送消息使用的是TextMessage
//TextMessage textMessage = (TextMessage) messageConsumer.receive(); //这个方法将一直等待,当有消息了才会执行下面的方法,每次获取一条消息
TextMessage textMessage = (TextMessage) messageConsumer.receive(10000); //只等10秒钟,没有消息来我就执行下面的代码了
if (textMessage != null) {
System.out.println(textMessage.getText());
} else {
break;
}
}
//7.释放资源
messageConsumer.close();
session.close();
connection.close();
}
}
访问管理控制台
使用监听器接收消息 (非阻塞)
package cn.fg.jsm.listener;
import java.io.IOException;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Listener {
public static void main(String[] args) throws JMSException, IOException {
//1.创建连接工厂,brokerURL(可以理解为实例):ActiveMQ实例的地址
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.1.3:61616");
//2.通过工厂创建连接
Connection connection = factory.createConnection();
connection.start(); //开始连接
//3.创建回话,两个参数:transacted 事务,acknowledgeMode 签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(队列或主题)
Queue queue = session.createQueue("queue01"); //创建队列
//5.创建消费者,参数:destination 目的地
MessageConsumer messageConsumer = session.createConsumer(queue);
//6.创建监听器,MessageListener是一个接口,可以自己自定义监听类实现接口,这里就使用一个内部类演示
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
//消息到了之后的逻辑处理
if (message != null && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
//7.释放资源
System.in.read(); //控制台按任意键继续,这里阻塞一下,如果关闭了资源,消息就收不到了
messageConsumer.close();
session.close();
connection.close();
}
}
如果有多个消费者同时接收同一个队列名称,则消费者会轮流接收到消息,例如生产者发送6条消息,消费者A收到135,消费者B收到246
总结
1、JMS开发步骤
2、 队列是点对点的消息传递,每个消息一般只设定被一个消费者接收,如果要被多个消费接收,就要使用主题了(后面介绍)
3、无论消费者是否在线,只要没有被消费,就会一直存在(可以登录控制台手动清除),但是一旦被消费就不会再次出现。