欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

springboot整合rabbitmq的示例代码

程序员文章站 2023-12-04 11:50:16
概述 rabbitmq是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,或者简单地将作业队列以便让分布式服务器进行处理。 它现...

概述

  1. rabbitmq是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,或者简单地将作业队列以便让分布式服务器进行处理。
  2. 它现实了amqp协议,并且遵循mozilla public license开源协议,它支持多种语言,可以方便的和spring集成。
  3. 消息队列使用消息将应用程序连接起来,这些消息通过像rabbitmq这样的消息代理服务器在应用程序之间路由。

基本概念

broker

用来处理数据的消息队列服务器实体

vhost

由rabbitmq服务器创建的虚拟消息主机,拥有自己的权限机制,一个broker里可以开设多个vhost,用于不同用户的权限隔离,vhost之间是也完全隔离的。

productor

产生用于消息通信的数据

channel

消息通道,在amqp中可以建立多个channel,每个channel代表一个会话任务。

exchange

direct

转发消息到routing-key指定的队列

springboot整合rabbitmq的示例代码fanout

fanout

转发消息到所有绑定的队列,类似于一种广播发送的方式。

springboot整合rabbitmq的示例代码topic

topic

按照规则转发消息,这种规则多为模式匹配,也显得更加灵活

springboot整合rabbitmq的示例代码queue

queue

  1.  队列是rabbitmq的内部对象,存储消息
  2. 以动态的增加消费者,队列将接受到的消息以轮询(round-robin)的方式均匀的分配给多个消费者。

binding

表示交换机和队列之间的关系,在进行绑定时,带有一个额外的参数binding-key,来和routing-key相匹配。

consumer

监听消息队列来进行消息数据的读取

springboot下三种exchange模式(fanout,direct,topic)实现

pom.xml中引用spring-boot-starter-amqp

<dependency>
  <groupid>org.springframework.boot</groupid>
  <artifactid>spring-boot-starter-amqp</artifactid>
</dependency>

增加rabbitmq配置

spring: 
 rabbitmq: 
 host: localhost 
 port: 5672 
 username: guest 
 password: guest

direct

direct模式一般情况下只需要定义queue 使用自带交换机(defaultexchange)无需绑定交换机

  @configuration
public class rabbitp2pconfigure {  
 public static final string queue_name = "p2p-queue";
  @bean
  public queue queue() {
    return new queue(queue_name, true);
  }

}
@runwith(springrunner.class)
@springboottest(classes = bootcoretestapplication.class)
@slf4j
public class rabbittest {
  @autowired
  private amqptemplate amqptemplate;

  /**
  * 发送
  */
  @test
  public void sendlazy() throws interruptedexception {
    city city = new city(234556666l, "direct_name", "direct_code");
    amqptemplate.convertandsend(rabbitlazyconfigure.queue_name, city);
  }
  
  /**
  * 领取
  */
  @test
  public void receive() throws interruptedexception {
    object obj = amqptemplate.receiveandconvert(rabbitlazyconfigure.queue_name);
    assert.notnull(obj, "");
    log.debug(obj.tostring());
  }
}

适用场景:点对点

fanout

fanout则模式需要将多个queue绑定在同一个交换机上

@configuration
public class rabbitfanoutconfigure {
  public static final string exchange_name = "fanout-exchange";
  public static final string fanout_a = "fanout.a";
  public static final string fanout_b = "fanout.b";
  public static final string fanout_c = "fanout.c";

  @bean
  public queue amessage() {
    return new queue(fanout_a);
  }

  @bean
  public queue bmessage() {
    return new queue(fanout_b);
  }

  @bean
  public queue cmessage() {
    return new queue(fanout_c);
  }

  @bean
  public fanoutexchange fanoutexchange() {
    return new fanoutexchange(exchange_name);
  }

  @bean
  public binding bindingexchangea(queue amessage, fanoutexchange fanoutexchange) {
    return bindingbuilder.bind(amessage).to(fanoutexchange);
  }

  @bean
  public binding bindingexchangeb(queue bmessage, fanoutexchange fanoutexchange) {
    return bindingbuilder.bind(bmessage).to(fanoutexchange);
  }

  @bean
  public binding bindingexchangec(queue cmessage, fanoutexchange fanoutexchange) {
    return bindingbuilder.bind(cmessage).to(fanoutexchange);
  }

}

发送者

@slf4j
public class sender {

  @autowired
  private amqptemplate rabbittemplate;

  public void sendfanout(object message) {
    log.debug("begin send fanout message<" + message + ">");
    rabbittemplate.convertandsend(rabbitfanoutconfigure.exchange_name, "", message);
  }

}

我们可以通过@rabbitlistener监听多个queue来进行消费

@slf4j
@rabbitlistener(queues = {
    rabbitfanoutconfigure.fanout_a,
    rabbitfanoutconfigure.fanout_b,
    rabbitfanoutconfigure.fanout_c
})
public class receiver {

  @rabbithandler
  public void receivemessage(string message) {
    log.debug("received <" + message + ">");
  }
}

适用场景
- 大规模多用户在线(mmo)游戏可以使用它来处理排行榜更新等全局事件
- 体育新闻网站可以用它来近乎实时地将比分更新分发给移动客户端
- 分发系统使用它来广播各种状态和配置更新
- 在群聊的时候,它被用来分发消息给参与群聊的用户

topic

这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”,exchange会将消息转发到所有关注主题能与routekey模糊匹配的队列。

在进行绑定时,要提供一个该队列关心的主题,如“topic.# (“#”表示0个或若干个关键字,“*”表示一个关键字。 )

@configuration
public class rabbittopicconfigure {
  public static final string exchange_name = "topic-exchange";
  public static final string topic = "topic";
  public static final string topic_a = "topic.a";
  public static final string topic_b = "topic.b";

  @bean
  public queue queuetopic() {
    return new queue(rabbittopicconfigure.topic);
  }

  @bean
  public queue queuetopica() {
    return new queue(rabbittopicconfigure.topic_a);
  }

  @bean
  public queue queuetopicb() {
    return new queue(rabbittopicconfigure.topic_b);
  }

  @bean
  public topicexchange exchange() {
    topicexchange topicexchange = new topicexchange(exchange_name);
    topicexchange.setdelayed(true);
    return new topicexchange(exchange_name);
  }

  @bean
  public binding bindingexchangetopic(queue queuetopic, topicexchange exchange) {
    return bindingbuilder.bind(queuetopic).to(exchange).with(rabbittopicconfigure.topic);
  }

  @bean
  public binding bindingexchangetopics(queue queuetopica, topicexchange exchange) {
    return bindingbuilder.bind(queuetopica).to(exchange).with("topic.#");
  }
}

同时去监听三个queue

@slf4j
@rabbitlistener(queues = {
    rabbittopicconfigure.topic,
    rabbittopicconfigure.topic_a,
    rabbittopicconfigure.topic_b
})
public class receiver {
  @rabbithandler
  public void receivemessage(string message) {
    log.debug("received <" + message + ">");
  }
}

通过测试我们可以发现

@runwith(springrunner.class)
@springboottest(classes = bootcoretestapplication.class)
public class rabbittest {
  @autowired
  private amqptemplate rabbittemplate;

  @test
  public void sendall() {
    rabbittemplate.convertandsend(rabbittopicconfigure.exchange_name, "topic.test", "send all");
  }

  @test
  public void sendtopic() {
    rabbittemplate.convertandsend(rabbittopicconfigure.exchange_name, rabbittopicconfigure.topic, "send topic");
  }

  @test
  public void sendtopica() {
    rabbittemplate.convertandsend(rabbittopicconfigure.exchange_name, rabbittopicconfigure.topic_a, "send topica");
  }
}

适用场景
- 分发有关于特定地理位置的数据,例如销售点
- 由多个工作者(workers)完成的后台任务,每个工作者负责处理某些特定的任务
- 股票价格更新(以及其他类型的金融数据更新)
- 涉及到分类或者标签的新闻更新(例如,针对特定的运动项目或者队伍)
- 云端的不同种类服务的协调
- 分布式架构/基于系统的软件封装,其中每个构建者仅能处理一个特定的架构或者系统。

延迟队列

延迟消费:

  1. 如用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单。
  2. 用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用。

延迟重试:

  1. 如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试。
  2. 如果不使用延迟队列,那么我们只能通过一个轮询扫描程序去完成。这种方案既不优雅,也不方便做成统一的服务便于开发人员使用。但是使用延迟队列的话,我们就可以轻而易举地完成。

设置交换机延迟属性为true

@configuration
public class rabbitlazyconfigure {
  public static final string queue_name = "lazy-queue-t";
  public static final string exchange_name = "lazy-exchange-t";

  @bean
  public queue queue() {
    return new queue(queue_name, true);
  }

  @bean
  public directexchange defaultexchange() {
    directexchange directexchange = new directexchange(exchange_name, true, false);
    directexchange.setdelayed(true);
    return directexchange;
  }

  @bean
  public binding binding() {
    return bindingbuilder.bind(queue()).to(defaultexchange()).with(queue_name);
  }

}

发送时设置延迟时间即可

@slf4j
public class sender {
  @autowired
  private amqptemplate rabbittemplate;
  public void sendlazy(object msg) {
    log.debug("begin send lazy message<" + msg + ">");
    rabbittemplate.convertandsend(rabbitlazyconfigure.exchange_name,
        rabbitlazyconfigure.queue_name, msg, message -> {
          message.getmessageproperties().setheader("x-delay", 10000);
          return message;
        }
    );
  }
}

结束

各种使用案例请直接查看

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。