欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

消息中间件--ActiveMQ&JMS消息服务

程序员文章站 2022-09-24 19:02:34
### 消息中间件 ### **消息中间件** 1. 消息中间件的概述 2. 消息中间件的应用场景(查看大纲文档,了解消息队列的应用场景) * 异步处理 * 应用解耦 * 流量削峰 * 消息通信 ### JMS消息服务 ### **JMS的概述** 1. JMS消息服务的概述 2. JMS消息模型 ......

### 消息中间件 ###

 
----------
 

**消息中间件**

 
1. 消息中间件的概述
2. 消息中间件的应用场景(查看大纲文档,了解消息队列的应用场景)
* 异步处理
* 应用解耦
* 流量削峰
* 消息通信
 
----------
 

### JMS消息服务 ###

 
----------
 

**JMS的概述**

 
1. JMS消息服务的概述
2. JMS消息模型
* P2P模式
* Pub/Sub模式
 
3. 消息消费的方式
* 同步的方式---手动
* 异步的方式---listener监听
 
4. JMS编程模型
 
----------

 

### 消息中间件:ActiveMQ ###

 
----------
 

**ActiveMQ的下载与安装**

 
1. ActiveMQ的下载与安装
* 在资料中找到ActiveMQ的压缩文件,解压apache-activemq-5.14.5-bin.zip文件
* 双击运行:activemq.bat文件,启动服务
 
2. 测试ActiveMQ是否安装成功
* 打开浏览器,输入:http://localhost:8161
 
3. 点击Manage ActiveMQ broker连接,可以查看ActiveMQ中已经发布的消息等
* 用户名密码都是:admin
 
----------
 

**ActiveMQ的消息队列方式入门**(P2P模式)

 
1. 在父工程的pom.xml文件中引入ActiveMQ和Spring整合JMS的坐标依赖(项目中已经引入)
<!-- activemq start -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.2.0</version>
</dependency>
 
<!-- activemq end -->
 
<!-- spring 与 mq整合 start -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.2.4.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.7</version>
</dependency>
<!-- spring 与 mq整合 end -->
 
2. ActiveMQ的向消息队列中发送消息的入门程序(没有使用Spring整合JMS的方式)
@Test
public void sendQueueMessage() throws JMSException {
// 1 创建连接工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
 
// 2 使用工厂,创建连接
Connection connection = factory.createConnection();
 
// 3 启动连接
connection.start();
 
// 4 使用连接,创建会话,true表示开始事务,代码执行后需要提供事务
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
 
// 5 创建队列队形(myQueue--队列的名字)/topic-----------session创建
Queue queue = session.createQueue("myQueue");
// 6 创建生产者-----------session创建
MessageProducer producer = session.createProducer(queue);
// 7 创建消息----文本消息-------session创建
TextMessage message = session.createTextMessage();
message.setText("helloworld!!!");
 
// 8 发送消息
producer.send(message);
 
// 9 提交事务
session.commit();
session.close();
connection.close();
}
 
3. ActiveMQ从消息队列中获取消息
@Test
public void receiverQueueMessage() throws JMSException {
// 1 创建连接工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
// 2 使用工厂,创建连接
Connection connection = factory.createConnection();
// 3 启动连接
connection.start();
// 4 使用连接,创建会话,true表示开始事务,代码执行后需要提供事务
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 5 创建队列队形(hello--队列的名字)/topic-----------session创建
Queue queue = session.createQueue("myQueue");
// 6 创建消费者-----------session创建
MessageConsumer consumer = session.createConsumer(queue);
 
// 7 接收消息----text格式
TextMessage receive = (TextMessage) consumer.receive();
String text = receive.getText();
System.out.println("接收到的消息====" + text);
 
// 8 提交事务
session.commit();
session.close();
connection.close();
 
}
 
4. 使用监听器的方式,从队列中消费消息
/**
*异步方式
Queue接受用Listener方式接受,多用
如果有多个监听listener,则交替执行
* @throws Exception
*/
@Test
public void receiverQueueListener() throws Exception{
// 1 创建连接工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
// 2 使用工厂,创建连接
Connection connection = factory.createConnection();
// 3 启动连接
connection.start();
// 4 使用连接,创建会话,true表示开始事务,代码执行后需要提供事务//死循环的不能用事物
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5 创建队列队形(hello--队列的名字)/topic-----------session创建
Queue queue = session.createQueue("myQueue");
// 6 创建消费者-----------session创建
MessageConsumer consumer = session.createConsumer(queue);
 
//7 // 给消费者添加监听器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message msg) {
TextMessage message = (TextMessage) msg;
try {
System.out.println("Listener1111111111接收到的消息是=="+message.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
 
while(true){}
// 使用监听器的方式不能关闭,需要监听器一直工作
// session.commit();
// session.close();
// connection.close();
}
 
 

**ActiveMQ的消息订阅方式入门**(Pub/Sub模式)

/**
* Topic发送
* @throws JMSException
*/
@Test
public void sendTopicMessage() throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
 
// 创建消息订阅
Topic topic = session.createTopic("myTopic");
// 创建生产者
MessageProducer producer = session.createProducer(topic);
// 创建消息,一组可以存储key value的消息
MapMessage message = session.createMapMessage();
message.setString("username", "cgx");
message.setString("password", "123456");
// 发送消息
producer.send(message);
// 提交事务
session.commit();
session.close();
connection.close();
}
 
/**
* Topic接受
*
* @throws JMSException
*/
@Test
public void testReceiverMessage() throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
 
// 创建消息订阅
Topic topic = session.createTopic("myTopic");
// 创建消费者
MessageConsumer consumer = session.createConsumer(topic);
// 接收消息
MapMessage message = (MapMessage) consumer.receive();
System.out.println(message.getString("username"));
System.out.println(message.getString("password"));
 
session.commit();
session.close();
connection.close();
}
 
/**
* Topic接受Listener监听方式
*
* @throws Exception
*/
@Test
public void receiverQueueListener() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建消息订阅
Topic topic = session.createTopic("myTopic");
// 创建消费者
MessageConsumer consumer = session.createConsumer(topic);
 
// 给消费者添加监听器consumer添加监听
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message msg) {
MapMessage message = (MapMessage) msg;
try {
System.out.println(message.getString("username"));
System.out.println(message.getString("password"));
} catch (JMSException e) {
e.printStackTrace();
}
}
});
 
while (true) {
 
}
 
}
 
 
 

### Spring整合ActiveMQ ###★★★★★

 
----------
 
**Spring整合ActiveMQ**★★★★★
 
1. 创建applicationContext-mq.xml的配置文件,导入约束★★★★★
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:task="http://www.springframework.org/schema/task"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
http://www.springframework.org/schema/data/jpa
http://www.springframework.org/schema/data/jpa/spring-jpa.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
 
</beans>
2. 具体的配置如下★★★★★
applicationContext-mq.xml===================mq的消息发送(消息生产者)
<!-- 配置连接工厂 -->
<!-- ActiveMQ 连接工厂 -->
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://localhost:61616" userName="admin" password="admin" />
 
<!-- Spring Caching连接工厂 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<!-- Session缓存数量和链接数有关 -->
<property name="sessionCacheSize" value="100" />
</bean>
 
<!-- 定义JmsTemplate的Queue类型 -->★★★★★
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="connectionFactory" />
<!-- 非pub/sub模型(发布/订阅),即队列模式 -->
<property name="pubSubDomain" value="false" />
</bean>
 
<!-- 定义JmsTemplate的Topic类型 -->★★★★★
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate" >
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="connectionFactory" />
<!-- pub/sub模型(发布/订阅) -->
<property name="pubSubDomain" value="true" />
</bean>
 
3. 发送消息的代码如下★★★★★
1.Queue方式:★★★★★
@Autowired
@Qualifier(value="jmsQueueTemplate")
private JmsTemplate queueTemplate;//Queue
 
 
/**
* Queue发送消息---spring框架
*/
@Test
public void sendQueueMessage() {
// 发送消息 构造参数指定目标,因为配置文件中的队列和订阅模式是通过id与false和true进行区分
queueTemplate.send("myQueue", new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
// 使用session创建消息,发送
TextMessage textMessage = session.createTextMessage("测试结合spring框架发送queue消息");
return textMessage;
}
});
}
2.Topic方式:★★★★★
@Autowired
@Qualifier(value = "jmsTopicTemplate")
private JmsTemplate topicTemplate;//Topic
 
/**
* Topic发送消息---spring框架
*/
@Test
public void sendTopicMessage() {
topicTemplate.send("spring_topic", new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("username", "mdzz");
return mapMessage;
}
});
}
 
4. 接收消息的代码如下==========不提倡手动,要用监听器异步获取
/**
* Queue接收消息---spring框架
* 同步手动:不提倡
* receive("myQueue")要写目标,不写目标的话会报找不到目标的错误NO defaultDestination
*/
@Test
public void receiverMessage() {
//接收消息textMessage类型
TextMessage textMessage = (TextMessage) queueTemplate.receive("myQueue");
 
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
 
 
**Spring配置监听器**★★★★★★★★★★★★★★★
 
1. 自定义监听器代码的编写----接收消息---spring框架---实现MessageListener接口★★★★★
1.Queue:★★★★★
@Component(value="queueConsumer1")
public class QueueListener implements MessageListener {
 
@Override
public void onMessage(Message arg0) {
// 把arg0强转
TextMessage textMessage = (TextMessage) arg0;
try {
// 输出消息
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
 
}
 
2.Topic:发送一个,两个都会接受★★★★★topic特点:有几个监听几个都会同时收到
@Component
public class TopicConsumer1 implements MessageListener {
 
@Override
public void onMessage(Message arg0) {
MapMessage mapMessage = (MapMessage) arg0;
try {
System.out.println("TopicConsumer1===="+mapMessage.getString("username"));
} catch (JMSException e) {
e.printStackTrace();
}
}
 
}
 
@Component
public class TopicConsumer2 implements MessageListener {
 
2. 编写配置文件
applicationContext-mq-consumer.xml=============mq的消息接受(负责监听接受消息)
<!-- 扫描包 -->
<context:component-scan base-package="com.itcast.jms.consumer" />
 
<!-- ActiveMQ 连接工厂 -->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://localhost:61616" userName="admin" password="admin" />
 
<!-- Spring Caching连接工厂 -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<property name="sessionCacheSize" value="100" />
</bean>
 
<!-- Spring JmsTemplate 的消息生产者 start-->
<jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory">
<jms:listener destination="myQueue" ref="queueConsumer1"/>---------自定义监听★★★★★
</jms:listener-container>
 
<jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory">
<jms:listener destination="spring_topic" ref="topicConsumer1"/>---------自定义监听★★★★★
<jms:listener destination="spring_topic" ref="topicConsumer2" />---------自定义监听★★★★★
</jms:listener-container>
 
 
3.不用启动项目,把spring配置文件applicationContext-mq-consumer.xml启动起来,可以用采用下面方法
新建一个test类,让他一直启动着,这样就一直加载spring的配置文件
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:applicationContext-mq-consumer.xml")
public class SpringQueueListenerTest {
 
@Test
public void test(){
while(true);
}
}
4.只要发送端(发送消息---spring框架)一启动,监听器就会监听到,就会输出:测试结合spring框架发送queue消息★★★★★

总结:

消息发送
1. 创建spring容器
2. 从容器中获取JMSTemplate对象,发送消息
3. 定义Destination
4. 使用JMSTemplate对象发送消息
消息接受
1. 创建一个类实现MessageListener 接口。业务处理在此类中实现。
2.在spring容器中配置DefaultMessageListenerContainer对象,引用MessageListener 实现类对象接收消息。

项目整合ActiveMQ:

1. 消息生产者整合ActiveMQ
消息生产者只需要发送消息
需要把JMSTemplate和Destination交给spring进行管理

 

/**===========================activeMQ消息发送========================================*/
// 发送消息!!!
this.send("save", item.getId());
}

@Autowired
private JmsTemplate jmsTemplate;

@Autowired
private Destination destination;

/**
* 此方法就是用来发送消息的
* 考虑:1、发送什么数据?2、我需要什么数据?
* 在消息中需要:1、消息的标识:save,delete,update;2、商品的ID
*/
private void send(final String type, final Long itemId) {
// TODO Auto-generated method stub
jmsTemplate.send(destination, new MessageCreator() {

@Override
public Message createMessage(Session session) throws JMSException {
//创建消息体
TextMessage textMessage = new ActiveMQTextMessage();
//设置消息内容
Map<String, Object> map = new HashMap<>();
map.put("type", type);
map.put("itemId", itemId);
try {
ObjectMapper mapper = new ObjectMapper();
textMessage.setText(mapper.writeValueAsString(map));
} catch (Exception e) {
e.printStackTrace();
}
return textMessage;
}
});
}

/**===========================activeMQ消息发送========================================*/
2. 消息消费改造
在taotao-search-service添加
ItemMessageListener:
@Autowired
private SearchService searchService;

@Override
public void onMessage(Message message) {
//先判断此消息类型是否是TextMessage
if(message instanceof TextMessage){
//如果是,强转
TextMessage textMessage = (TextMessage)message;
try {
//获取消息:json
String json = textMessage.getText();
//杰克逊第三作用:直接解析json数据
ObjectMapper mapper = new ObjectMapper();
JsonNode jsonNode = mapper.readTree(json);
String type = jsonNode.get("type").asText();
Long itemId = jsonNode.get("itemId").asLong();
//根据解析出来的type,判断此type=save的时候我应该调用indexSearch方法
if("save".equals(type)){
searchService.indexItem(itemId);
}

} catch (Exception e) {
e.printStackTrace();
}
}

}

SearchServiceImpl:
@Override
public void indexItem(Long itemId) throws Exception {
Item item = this.itemMapper.selectByPrimaryKey(itemId);

SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", item.getId());
doc.addField("item_title", item.getTitle());
doc.addField("item_image", item.getImage());
doc.addField("item_cid", item.getCid());
doc.addField("item_price", item.getPrice());
doc.addField("item_status", item.getStatus());

this.cloudSolrServer.add(doc);

this.cloudSolrServer.commit();

}