欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

springboot 整合 RabbitMQ实现消息队列

程序员文章站 2022-06-28 17:09:59
springboot 整合 RabbitMQ消息队列作为分布式系统中重要的组件,可以有效解决应用耦合,异步消息,流量削锋等系列问题,有利于实现高性能,高可用,可伸缩和最终一致性架构。可应用在业务解耦,消息异步分发(提高应用响应速度)。本文主要讲述springboot部署rabbitmq的一些简单消息发送。依赖包 org.springframewo...

springboot 整合 RabbitMQ

消息队列作为分布式系统中重要的组件,可以有效解决应用耦合,异步消息,流量削锋等系列问题,有利于实现高性能,高可用,可伸缩和最终一致性架构。可应用在业务解耦,消息异步分发(提高应用响应速度)。本文主要讲述springboot部署rabbitmq的一些简单消息发送。

依赖包

<!-- 消息队列 rabbitmq  依赖包 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-- 消息队列 rabbitmq  依赖包 end --> 

配置

写在application.properties文件

## ==============rabbitmq配置========= 后台 http://ip地址:15672/ spring.rabbitmq.host=ip地址
spring.rabbitmq.port= 5672 spring.rabbitmq.username=root
spring.rabbitmq.password=123456789 

rabbitmq配置

消息名称

package com.sise.demo1.demo.common.config.amqp; /**
 * author zxq
 * date 2020/8/6 23:27
 * 消息名称
 */ public class MQField { public static final String HELLO_STRING_QUEUE = "stringQueue"; public static final String HELLO_USER_QUEUE = "userQueue"; public static final String MY_WORKER_QUEUE = "workerQueue"; public static final String MY_FANOUT_A_QUEUE = "fanoutAQueue"; public static final String MY_FANOUT_B_QUEUE = "fanoutBQueue"; public static final String MY_FANOUT_EXCHANGE = "fanoutQueue"; public static final String MY_TOPIC_A_QUEUE = "topicAQueue"; public static final String MY_TOPIC_B_QUEUE = "topicBQueue"; public static final String MY_TOPIC_EXCHANGE = "topicExchange"; } 

配置类

package com.sise.demo1.demo.common.config.amqp; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /**
 * author zxq
 * date 2020/8/6 23:25
 * RabbitMQ消息队列配置类
 */ @Configuration public class RabbitMQConfig { /**
     * 声明接收字符串的队列 Hello 默认
     * @return
     */ /**
     * 声明接收字符串的队列 Hello 默认
     *
     * @return
     */ @Bean public Queue stringQueue() { //boolean isDurable = true;//是否持久化 //boolean isExclusive = false;  //仅创建者可以使用的私有队列,断开后自动删除 //boolean isAutoDelete = false;  //当所有消费客户端连接断开后,是否自动删除队列 //Queue queue = new Queue(MQField.HELLO_STRING_QUEUE, isDurable, isExclusive, isAutoDelete); //return  queue; //return new Queue(MQField.HELLO_STRING_QUEUE); //默认支持持久化 return QueueBuilder.durable(MQField.HELLO_STRING_QUEUE) //.exclusive() //.autoDelete() .build(); } /**
     * 声明接收user对象的队列 Hello  支持持久化
     *
     * @return
     */ @Bean public Queue userQueue() { return QueueBuilder.durable(MQField.HELLO_USER_QUEUE).build(); } /**
     * 声明WorkQueue队列 competing consumers pattern,多个消费者不会重复消费队列的相同消息
     *
     * @return
     */ @Bean public Queue workQueue() { return QueueBuilder.durable(MQField.MY_WORKER_QUEUE).build(); } /**
     * 消息队列中最常见的模式:发布订阅模式
     * <p>
     * 声明发布订阅模式队列 Publish/Subscribe
     * <p>
     * exchange类型包括:direct, topic, headers 和 fanout
     **/ /*fanout(广播)队列相关声明开始*/ @Bean public Queue fanOutAQueue() { return QueueBuilder.durable(MQField.MY_FANOUT_A_QUEUE).build(); } @Bean public Queue fanOutBQueue() { return QueueBuilder.durable(MQField.MY_FANOUT_B_QUEUE).build(); } @Bean FanoutExchange fanoutExchange() { return (FanoutExchange) ExchangeBuilder.fanoutExchange(MQField.MY_FANOUT_EXCHANGE).build(); //return new FanoutExchange(MQField.MY_FANOUT_EXCHANGE); } @Bean Binding bindingExchangeA(Queue fanOutAQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanOutAQueue).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue fanOutBQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanOutBQueue).to(fanoutExchange); } /*fanout队列相关声明结束*/ /*topic队列相关声明开始*/ @Bean public Queue topicAQueue() { return QueueBuilder.durable(MQField.MY_TOPIC_A_QUEUE).build(); } @Bean public Queue topicBQueue() { return QueueBuilder.durable(MQField.MY_TOPIC_B_QUEUE).build(); } @Bean TopicExchange topicExchange() { return (TopicExchange) ExchangeBuilder.topicExchange(MQField.MY_TOPIC_EXCHANGE).build(); }

业务层

消息生产者

package com.sise.demo1.demo.model.service.amqpService.object; import com.sise.demo1.demo.common.config.amqp.MQField; import com.sise.demo1.demo.model.entity.User; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /**
 * author zxq
 * date 2020/8/7 20:47
 */ @Component public class Sender { @Autowired private AmqpTemplate rabbitTemplate; public boolean send(User user){ boolean isOk = false; rabbitTemplate.convertAndSend(MQField.HELLO_USER_QUEUE,user); isOk = true; System.out.println("Sender发送对象:"+isOk); return isOk; } } 

//消息消费者

package com.sise.demo1.demo.model.service.amqpService.object; import com.sise.demo1.demo.common.config.amqp.MQField; import com.sise.demo1.demo.model.entity.User; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /**
 * author zxq
 * date 2020/8/7 20:46
 */ @Component public class Receive { @RabbitListener(queues = MQField.HELLO_USER_QUEUE) @RabbitHandler public void process(User user){ try { Thread.sleep(5000); } catch (Exception e) { e.printStackTrace(); } System.out.println("Receiver接收到的对象是 => " + user.toString()); } } 

控制层

package com.sise.demo1.demo.controller.amqp; import com.sise.demo1.demo.common.ResultVO; import com.sise.demo1.demo.model.entity.User; import com.sise.demo1.demo.model.service.IUserService; import com.sise.demo1.demo.model.service.amqpService.object.Sender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import org.springframework.util.StringUtils; /**
 * author zxq
 * date 2020/8/6 23:49
 */ @RestController @RequestMapping(value = "/test") public class Test { @Autowired private IUserService userService; @Autowired private Sender senderObject; @RequestMapping(value = "/object") public ResultVO object_index(@RequestParam(value = "userId", required = false) Integer userId){ if(StringUtils.isEmpty(userId)){ return ResultVO.getError("缺少参数 userId"); } User user = userService.get(userId); boolean bool = senderObject.send(user); return ResultVO.getSuccessWithData("发送:"+bool,user); } } 

运行截图

springboot 整合 RabbitMQ实现消息队列
springboot 整合 RabbitMQ实现消息队列
springboot 整合 RabbitMQ实现消息队列

此处已经简单地实现了一端发消息,一端进行了接收消息。

如有错误,或有更好的想法,请联系我,感激不尽!

本文地址:https://blog.csdn.net/weixin_41908336/article/details/107885311

相关标签: Java 消息队列