欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

SpringBoot+RabbitMq具体使用的几种姿势

程序员文章站 2023-12-01 12:30:10
目前主流的消息中间件有activemq,rabbitmq,rocketmq,kafka,我们要根据实际的业务场景来选择一款合适的消息中间件,关注的主要指标有,消息投递的可靠...

目前主流的消息中间件有activemq,rabbitmq,rocketmq,kafka,我们要根据实际的业务场景来选择一款合适的消息中间件,关注的主要指标有,消息投递的可靠性,可维护性,吞吐量以及中间件的特色等重要指标来选择,大数据领域肯定是kafka,那么传统的业务场景就是解耦,异步,削峰。那么就在剩下的3款产品中选择一款,从吞吐量,社区的活跃度,消息的可靠性出发,一般的中小型公司选择rabbitmq来说可能更为合适。那么我们就来看看如何使用它吧。

环境准备

本案例基于springboot集成rabbitmq,本案例主要侧重要实际的code,对于基础理论知识请自行百度。

jdk-version:1.8

rabbitmq-version:3.7

springboot-version:2.1.4.release

pom文件

 <dependency>
 <groupid>org.springframework.boot</groupid>
 <artifactid>spring-boot-starter-amqp</artifactid>
</dependency>

yml配置文件

spring:
 rabbitmq:
 password: guest
 username: guest
 port: 5672
 addresses: 127.0.0.1
 #开启发送失败返回
 publisher-returns: true
 #开启发送确认
 publisher-confirms: true
 listener:
  simple:
  #指定最小的消费者数量.
  concurrency: 2
  #指定最大的消费者数量.
  max-concurrency: 2
  #开启ack
  acknowledge-mode: auto
  #开启ack
  direct:
  acknowledge-mode: auto
 #支持消息的确认与返回
 template:
  mandatory: true

配置rabbitmq的姿势

姿势一

基于javaconfig

package com.lly.order.message;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;

/**
 * @classname rabbitmqconfig
 * @description rabbitmq配置类
 * @author lly
 * @date 2019-05-13 15:05
 * @version 1.0
 **/
@configuration
public class rabbitmqconfig {

 public final static string direct_queue = "directqueue";
 public final static string topic_queue_one = "topic_queue_one";
 public final static string topic_queue_two = "topic_queue_two";
 public final static string fanout_queue_one = "fanout_queue_one";
 public final static string fanout_queue_two = "fanout_queue_two";

 public final static string topic_exchange = "topic_exchange";
 public final static string fanout_exchange = "fanout_exchange";

 public final static string topic_routingkey_one = "common_key";
 public final static string topic_routingkey_two = "*.key";

// direct模式队列
 @bean
 public queue directqueue() {
  return new queue(direct_queue, true);
 }
// topic 订阅者模式队列
 @bean
 public queue topicqueueone() {
  return new queue(topic_queue_one, true);
 }
 @bean
 public queue topicqueuetwo() {
  return new queue(topic_queue_two, true);
 }
// fanout 广播者模式队列
 @bean
 public queue fanoutqueueone() {
  return new queue(fanout_queue_one, true);
 }
 @bean
 public queue fanoutqueuetwo() {
  return new queue(fanout_queue_two, true);
 }
// topic 交换器
 @bean
 public topicexchange topexchange() {
  return new topicexchange(topic_exchange);
 }
// fanout 交换器
 @bean
 public fanoutexchange fanoutexchange() {
  return new fanoutexchange(fanout_exchange);
 }

// 订阅者模式绑定
 @bean
 public binding topexchangebingingone() {
  return bindingbuilder.bind(topicqueueone()).to(topexchange()).with(topic_routingkey_one);
 }

 @bean
 public binding topicexchangebingingtwo() {
  return bindingbuilder.bind(topicqueuetwo()).to(topexchange()).with(topic_routingkey_two);
 }
// 广播模式绑定
 @bean
 public binding fanoutexchangebingingone() {
  return bindingbuilder.bind(fanoutqueueone()).to(fanoutexchange());
 }
 @bean
 public binding fanoutexchangebingingtwo() {
  return bindingbuilder.bind(fanoutqueuetwo()).to(fanoutexchange());
 }
}

姿势二

基于注解

package com.lly.order.message;

import com.rabbitmq.client.channel;
import lombok.extern.slf4j.slf4j;
import org.springframework.amqp.core.amqptemplate;
import org.springframework.amqp.core.exchangetypes;
import org.springframework.amqp.core.message;
import org.springframework.amqp.rabbit.annotation.exchange;
import org.springframework.amqp.rabbit.annotation.queue;
import org.springframework.amqp.rabbit.annotation.queuebinding;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.amqp.rabbit.connection.correlationdata;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.component;

import java.io.ioexception;
import java.time.localtime;
import java.util.uuid;


/**
 * @classname mqtest
 * @description 消息队列测试
 * @author lly
 * @date 2019-05-13 10:50
 * @version 1.0
 **/
@component
@slf4j
public class mqtest implements rabbittemplate.confirmcallback, rabbittemplate.returncallback {

 private final static string queue = "test_queue";

 @autowired
 private amqptemplate amqptemplate;

 @autowired
 private rabbittemplate rabbittemplate;

 public mqtest(rabbittemplate rabbittemplate) {
  rabbittemplate.setconfirmcallback(this);
  rabbittemplate.setreturncallback(this);
 }

 public void sendmq() {
  rabbittemplate.convertandsend("test_queue", "test_queue" + localtime.now());
  log.info("发送消息:{}", "test_queue" + localtime.now());
 }


 public void sendmqrabbit() {
  //回调id
  correlationdata cid = new correlationdata(uuid.randomuuid().tostring());
//  rabbittemplate.convertandsend(rabbitmqconfig.fanout_exchange, "", "广播者模式测试",cid);
  object object = rabbittemplate.convertsendandreceive(rabbitmqconfig.fanout_exchange, "", "广播者模式测试", cid);
  log.info("发送消息:{},object:{}", "广播者模式测试" + localtime.now(), object);
 }

 //发送订阅者模式
 public void sendmqexchange() {
  correlationdata cid = new correlationdata(uuid.randomuuid().tostring());
  correlationdata cid01 = new correlationdata(uuid.randomuuid().tostring());
  log.info("订阅者模式->发送消息:routing_key_one");
  rabbittemplate.convertsendandreceive("topic_exchange", "routing_key_one", "routing_key_one" + localtime.now(), cid);
  log.info("订阅者模式->发送消息routing_key_two");
  rabbittemplate.convertsendandreceive("topic_exchange", "routing_key_two", "routing_key_two" + localtime.now(), cid01);
 }
 //如果不存在,自动创建队列
 @rabbitlistener(queuestodeclare = @queue("test_queue"))
 public void receivermq(string msg) {
  log.info("接收到队列消息:{}", msg);
 }
  //如果不存在,自动创建队列和交换器并且绑定
 @rabbitlistener(bindings = {
   @queuebinding(value = @queue(value = "topic_queue01", durable = "true"),
     exchange = @exchange(value = "topic_exchange", type = exchangetypes.topic),
     key = "routing_key_one")})
 public void receivermqexchage(string msg, channel channel, message message) throws ioexception {

  long deliverytag = message.getmessageproperties().getdeliverytag();

  try {
   log.info("接收到topic_routing_key_one消息:{}", msg);
   //发生异常
   log.error("发生异常");
   int i = 1 / 0;
   //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
   channel.basicack(deliverytag, false);
  } catch (exception e) {
   log.error("接收消息失败,重新放回队列");
   //requeu,为true,代表重新放入队列多次失败重新放回会导致队列堵塞或死循环问题,
   // 解决方案,剔除此消息,然后记录到db中去补偿
   //channel.basicnack(deliverytag, false, true);
   //拒绝消息
   //channel.basicreject(deliverytag, true);
  }
 }

 @rabbitlistener(bindings = {
   @queuebinding(value = @queue(value = "topic_queue02", durable = "true"),
     exchange = @exchange(value = "topic_exchange", type = exchangetypes.topic),
     key = "routing_key_two")})
 public void receivermqexchagetwo(string msg) {
  log.info("接收到topic_routing_key_two消息:{}", msg);
 }


 @rabbitlistener(queues = rabbitmqconfig.fanout_queue_one)
 public void receivermqfanout(string msg, channel channel, message message) throws ioexception {
  long deliverytag = message.getmessageproperties().getdeliverytag();
  try {
   log.info("接收到队列fanout_queue_one消息:{}", msg);
   channel.basicack(deliverytag, false);
  } catch (exception e) {
   e.printstacktrace();
   //多次失败重新放回会导致队列堵塞或死循环问题 丢弃这条消息
//   channel.basicnack(message.getmessageproperties().getdeliverytag(), false, false);
   log.error("接收消息失败");
  }
 }

 @rabbitlistener(queues = rabbitmqconfig.fanout_queue_two)
 public void receivermqfanouttwo(string msg) {
  log.info("接收到队列fanout_queue_two消息:{}", msg);
 }

 /**
  * @return
  * @author lly
  * @description 确认消息是否发送到exchange
  * @date 2019-05-14 15:36
  * @param [correlationdata, ack, cause]
  **/
 @override
 public void confirm(correlationdata correlationdata, boolean ack, string cause) {
  log.info("消息唯一标识id:{}", correlationdata);
  log.info("消息确认结果!");
  log.error("消息失败原因,cause:{}", cause);
 }
 /**
  * @return
  * @author lly
  * @description 消息消费发生异常时返回
  * @date 2019-05-14 16:22
  * @param [message, replycode, replytext, exchange, routingkey]
  **/
 @override
 public void returnedmessage(message message, int replycode, string replytext, string exchange, string routingkey) {
  log.info("消息发送失败id:{}", message.getmessageproperties().getcorrelationid());
  log.info("消息主体 message : ", message);
  log.info("消息主体 message : ", replycode);
  log.info("描述:" + replytext);
  log.info("消息使用的交换器 exchange : ", exchange);
  log.info("消息使用的路由键 routing : ", routingkey);
 }
}

rabbitmq消息确认的三种方式

# 发送消息后直接确认消息
acknowledge-mode:none
# 根据消息消费的情况,智能判定消息的确认情况
acknowledge-mode:auto
# 手动确认消息的情况
acknowledge-mode:manual

我们以topic模式来试验下消息的ack

SpringBoot+RabbitMq具体使用的几种姿势

自动确认消息模式

SpringBoot+RabbitMq具体使用的几种姿势

SpringBoot+RabbitMq具体使用的几种姿势

手动确认消息模式

SpringBoot+RabbitMq具体使用的几种姿势

SpringBoot+RabbitMq具体使用的几种姿势

然后我们再次消费消息,发现消息是没有被确认的,所以可以被再次消费

SpringBoot+RabbitMq具体使用的几种姿势

发现同样的消息还是存在的没有被队列删除,必须手动去ack,我们修改队列1的手动ack看看效果

channel.basicack(deliverytag, false);

重启项目再次消费消息

SpringBoot+RabbitMq具体使用的几种姿势

再次查看队列里的消息,发现队列01里的消息被删除了,队列02的还是存在。

SpringBoot+RabbitMq具体使用的几种姿势

消费消息发生异常的情况,修改代码 模拟发生异常的情况下发生了什么, 异常发生了,消息被重放进了队列

SpringBoot+RabbitMq具体使用的几种姿势

但是会导致消息不停的循环消费,然后失败,致死循环调用大量服务器资源

SpringBoot+RabbitMq具体使用的几种姿势

所以我们正确的处理方式是,发生异常,将消息记录到db,再通过补偿机制来补偿消息,或者记录消息的重复次数,进行重试,超过几次后再放到db中。

总结

通过实际的code我们了解的rabbitmq在项目的具体的整合情况,消息ack的几种情况,方便在实际的场景中选择合适的方案来使用。如有不足,还望不吝赐教。希望对大家的学习有所帮助,也希望大家多多支持。