ActiveMQ学习(一)
程序员文章站
2022-07-15 09:43:36
...
文章目录
为什么要引入消息中间件
- 要做到系统解耦,当新的模块接进来时,可以做到代码改动最小:能够解耦
- 设置流量缓冲池,可以让后端系统按照自身吞吐能力进行消费,不被冲垮:能够削峰
- 强弱依赖梳理能将非关键调用链路的操作异步化并提升整体系统的吞吐能力:能够异步
什么是ActiveMQ
MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。
特点:
- 支持多种语言编写客户端
- 对spring的支持,很容易和spring整合
- 支持多种传输协议:TCP,SSL,NIO,UDP等
- 支持AJAX
消息形式:
- 点对点(queue)
- 一对多(topic)
点对点模式(point-to-point)
- 每个消息只能有一个消费者,类似1对1的关系。
- 消息的生产和消费者之间没有时间上的相关性 ,无论消费者在生产者发送消息的时候是否处于运行状态,消费者都可以提取消息。
- 消息被消费后队列中不会再存储,所以消费者不会消费到已经被消费掉的消息。
消息生产者编码(Queue)
//1.创建连接工程
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
//2.通过连接工厂,获得连接并启动访问
Connection connection = connectionFactory.createConnection();
connection.start();
//3.创建会话session
//两个参数,第一个叫事务,第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地Destination(是队列queue还是主题topic)
Queue queue = session.createQueue("activemq-queue");
//5.创建消息的生产者
MessageProducer producer = session.createProducer(queue);
//6.通过使用producer生产3条消息发送到mq的队列中
for (int i=0;i<3;i++){
//7.创建消息
TextMessage textMessage = session.createTextMessage("这是test的第" + i + "条message-----");
//8.通过Producer发送给queue
producer.send(textMessage);
}
//9.关闭资源
producer.close();
session.close();
connection.close();
System.out.println("消息发布到mq完成");
消息消费者编码(Queue,同步阻塞和监听器两种消费方式)
//1.创建连接工程
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
//2.通过连接工厂,获得连接并启动访问
Connection connection = connectionFactory.createConnection();
connection.start();
//3.创建会话session
//两个参数,第一个叫事务,第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地Destination(是队列queue还是主题topic)
Queue queue = session.createQueue("activemq-queue");
//5.创建消费者
MessageConsumer consumer = session.createConsumer(queue);
/*
//同步阻塞方式receive()
//订阅者或接受者调用consumer的receive方法来接收消息,receive方法再能够接收到消息(或超时之前)之前将一直阻塞
while (true){
TextMessage textMessage = (TextMessage)consumer.receive();
if (textMessage!=null){
System.out.println(textMessage.getText());
}else {
break;
}
}
//关闭资源
consumer.close();
session.close();
connection.close();
*/
//通过监听器的方式来消费消费
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message!=null && message instanceof TextMessage){
try {
System.out.println(((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read();//让资源不关闭,否则还没等到监听,连接就断开了
//关闭资源
consumer.close();
session.close();
connection.close();
发布订阅模式(publish-and-subsrcibe)
- 生产者将消息发布到topic中,每个消息可以有很多个消费者,属于1:N的关系。
- 生产者和消费者之有时间上的相关性。订阅某一个主题的消费者只能消费自订阅后之后发布的消息。
- 生产者生产时,topic 不保存消息,它是无状态的,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者。
- JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。**持久订阅允许消费者消费它在未处于**状态时发送的消息。**就好比我们的微信公众号的订阅。
发布消息编码(Topic)
//1.创建连接工程
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
//2.通过连接工厂,获得连接并启动访问
Connection connection = connectionFactory.createConnection();
connection.start();
//3.创建会话session
//两个参数,第一个叫事务,第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地Destination(是队列queue还是主题topic)
Topic topic = session.createTopic("activemq-topic");
//5.创建生产者
MessageProducer producer = session.createProducer(topic);
for (int i=0;i<3;i++){
TextMessage textMessage = session.createTextMessage("这是第" + i + "条topic测试");
producer.send(textMessage);
}
//关闭资源
producer.close();
session.close();
connection.close();
System.out.println("消息发布到mq完成");
消费消息编码(Topic)
System.out.println("我是一号消费者");
//1.创建连接工程
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
//2.通过连接工厂,获得连接并启动访问
Connection connection = connectionFactory.createConnection();
connection.start();
//3.创建会话session
//两个参数,第一个叫事务,第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地Destination(是队列queue还是主题topic)
Topic topic = session.createTopic("activemq-topic");
//5.创建消费者
MessageConsumer consumer = session.createConsumer(topic);
//通过监听器的方式来消费消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message!=null && message instanceof TextMessage){
try {
System.out.println(((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read();//让资源不关闭,否则还没等到监听,连接就断开了
//关闭资源
consumer.close();
session.close();
connection.close();