欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

RabbitMQ 基础知识总结

程序员文章站 2022-07-10 18:21:45
Hello World 模型 在下图中,“P”是我们的生产者,“C”是我们的消费者。中间的盒子是一个队列——RabbitMQ代表使用者保存的消息缓冲区生产者消费者连接对象封装2. Work queues 模型在下图中,“P”是我们的生产者,“C1”,“C2”是我们的两个消费者,。中间的盒子是一个队列——RabbitMQ代表使用者保存的消息缓冲区;没有设置这两句代码channel.basicQos(1)和channel.basicAck(envelope.getD.....
  1. Hello World 模型

   在下图中,“P”是我们的生产者,“C”是我们的消费者。中间的盒子是一个队列——RabbitMQ代表使用者保存的消息缓冲区

RabbitMQ 基础知识总结

生产者

public static void main(String[] args) throws IOException, TimeoutException {
    //创建工厂
    ConnectionFactory con=new ConnectionFactory();
    //设置连接那个主机
    con.setHost("47.115.44.94");
    //设置端口
    con.setPort(5672);
    //设置连接虚拟主机
    con.setVirtualHost("sem");
    //设置虚拟主机的用户名和密码
    con.setUsername("sem");
    con.setPassword("123");
    //获取连接对象
    Connection connection=con.newConnection();
    //Connection connection= RabbitmqConn.getConnection();//连接对象封装
    //获取连接通道
    Channel channel=connection.createChannel();
    //通道绑定对应消息队列
    // 参数1:队列客称 如果队列不存在自动创建
    // 参数2:用来定义队列特性是否夏持久化true持久化队列 false不持久化
    // 参数数3: exclusive是否独占队列 true独古队列 false 不独占
    // 参数数4: autoDelete:是否在消质完成后自动制除认列 true自动副除 false不自动删除
    // 参数数5:部外附加参数
    channel.queueDeclare("hello",false,false,false,null);
    //发布消息
    //参数1:交换机名称 参数2:队列名称 参数3:传递消息额外设置(例如设置:MessageProperties.PERSISTENT_TEXT_PLAIN 持久化队列的消息)
    // 参数4:消息的具体内容
    channel.basicPublish("","hello", null,"第一个".getBytes());
    channel.close();
    connection.close();
}

消费者

 

public static void main(String[] args) throws IOException, TimeoutException {
    //创建工厂
    ConnectionFactory con=new ConnectionFactory();
    //设置连接那个主机
    con.setHost("47.115.44.94");
    //设置端口
    con.setPort(5672);
    //设置连接虚拟主机
    con.setVirtualHost("sem");
    //设置虚拟主机的用户名和密码
    con.setUsername("sem");
    con.setPassword("123");
    //获取连接对象
    Connection connection=con.newConnection();
    // Connection connection= RabbitmqConn.getConnection();  连接对象封装
    //获取连接通道
    Channel channel=connection.createChannel();
    //通道绑定对应消息队列
    // 参数1:队列客称 如果队列不存在自动创建
    // 参数2:用来定义队列特性是否夏持久化true持久化队列 false不持久化
    // 参数数3: exclusive是否独占队列 true独古队列 false 不独占
    // 参数数4: autoDelete:是否在消质完成后自动制除认列 true自动副除 false不自动删除
    // 参数数5:部外附加参数
    channel.queueDeclare("hello",false,false,false,null);
    //消费消息
    //参数1:消费那个 队列的消息队列名称
    //参数2:开启消息的直动确认机
    //参数3:消费时的回调调接口
    channel.basicConsume("hello",true,new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                    System.out.println("---------->"+new String(body));
        }
    });
}

 

连接对象封装

public class RabbitmqConn {
    private static ConnectionFactory connectionFactory;
    static {
        //创建工厂
        connectionFactory=new ConnectionFactory();
        //设置连接那个主机
        connectionFactory.setHost("47.115.44.94");
        //设置端口
        connectionFactory.setPort(5672);
        //设置连接虚拟主机
        connectionFactory.setVirtualHost("sem");
        //设置虚拟主机的用户名和密码
        connectionFactory.setUsername("sem");
        connectionFactory.setPassword("123");
    }
    public static Connection getConnection() throws IOException, TimeoutException {
        //获取连接对象
        return connectionFactory.newConnection();
    }
}

2. Work queues 模型

在下图中,“P”是我们的生产者,“C1”,“C2”是我们的两个消费者,。中间的盒子是一个队列——RabbitMQ代表使用者保存的消息缓冲区;

没有设置这两句代码channel.basicQos(1)和channel.basicAck(envelope.getDeliveryTag(),false)之前,消费者消费消息是平均消费的,无论某个消费者消费的消息有多慢,也是平均消费,但是这种方式有时候不能够满足业务需求,我们需要的是能者多劳,那个消费者消费的快就消费多点,这样大大提高效了。

RabbitMQ 基础知识总结

生产者:

public static void main(String[] args) throws IOException, TimeoutException {
    //创建连接对象
    Connection connection = RabbitmqConn.getConnection();
    //连接通道
    Channel channel = connection.createChannel();
    //创建队列
    channel.queueDeclare("work",false,false,false,null);
    for (int i= 0;  i< 10; i++) {
        channel.basicPublish("","work",null,(i+"您好呀").getBytes());
    }
    //关闭连接
    channel.close();
    connection.close();

}

消费者1

public static void main(String[] args) throws IOException, TimeoutException {
    Connection connection = RabbitmqConn.getConnection();
    Channel channel = connection.createChannel();
    channel.basicQos(1);//每一次只能消费一个消息
    channel.queueDeclare("work",false,false,false,null);
    //第二个参数
    channel.basicConsume("work",false,new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("消费者1--------->"+new String(body));
            try {
                Thread.sleep(1000);
            }catch (Exception e){
                e.printStackTrace();
            }
            //参数1:确认队列中那个具体消息  参数2:是否开启3个消意同时确实
            channel.basicAck(envelope.getDeliveryTag(),false);
        }
    });
}

消费者2

public static void main(String[] args) throws IOException, TimeoutException {
    Connection connection = RabbitmqConn.getConnection();//连接对象
    Channel channel = connection.createChannel();//连接通道
    channel.basicQos(1);//每一次只能消费一个消息
    //创建队列
    channel.queueDeclare("work",false,false,false,null);
    channel.basicConsume("work",false,new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("消费者2--------->"+new String(body));
            //参数1:确认队列中那个具体消息  参数2:是否开启3个消意同时确实
          channel.basicAck(envelope.getDeliveryTag(),false);

        }
    });
}

 

3.Publish/Subscribe 模型(又称广播模型)

在下图中,“P”是我们的生产者,“X”代表转换机,“C1”,“C2”是我们的两个消费者。中间的盒子是队列

RabbitMQ 基础知识总结

在广播模式下:

消息发送流程是这样的:
1.可以有多个消费者
2.每个消费者有自己的queue(队列)
3.每个队列都要排定到Echange(交换机)
4.生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
5.交换机把消息发送给绑定过的所有队
6.列队列的消费者都能率到消息,实现一条消息被多个消费者消费

生产者

public static void main(String[] args) throws IOException, TimeoutException {
     //获取连接对象
    Connection connection = RabbitmqConn.getConnection();
    //创建连接通道
    Channel channel=connection.createChannel();
    //指定交换机,参数1:交换机名称,参数2 :交换机类型 ,fanout 广播类型
    channel.exchangeDeclare("logs","fanout");
    //发送消息
    channel.basicPublish("logs","",null,"logs out to  for".getBytes());
    channel.close();
    connection.close();
}

消费者(你可以模拟多个消费者测试,这里就不一一写了)

public static void main(String[] args) throws IOException, TimeoutException {
    //创建连接对象
    Connection connection = RabbitmqConn.getConnection();
    //创建连接通道
    Channel channel = connection.createChannel();
    //通道绑定交换机
    channel.exchangeDeclare("logs","fanout");
    //临时通道
    String queueName = channel.queueDeclare().getQueue();
    //绑定交换机与队列
    channel.queueBind(queueName,"logs","");
    channel.basicConsume(queueName,true,new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("消费1---》"+ new String(body));
        }
    });
}

 

 

4.Routing 模型

在fanout模式中,一条消息,会被所有订购的队列都消费,但是,在 某些 场景下我们希望不同的消息被不同的消息队列消费这时就要用到Direct类型的Exchange;

Direct模型下:
1.队列与交换机的绑定,不能是任意讲定了,而是要指定一个Routingkey (路由key),
2.消息的发送方在向Echange发送消息时,也必须指定消息的Routingkey
3.Exchang不再把消息交给每一个排定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的Routing key完全一致,才会接收到消息;

RabbitMQ 基础知识总结

生产者

public static void main(String[] args) throws IOException, TimeoutException {
    //创建连接对象
    Connection connection = RabbitmqConn.getConnection();
    //创建连接通道
    Channel channel = connection.createChannel();
    //交换机
    channel.exchangeDeclare("hell_direct","direct");
    //发送消息
    String routingkey="error";
    channel.basicPublish("hell_direct",routingkey,null,"这是direst".getBytes());
    channel.close();
    connection.close();
}

消费者1

public static void main(String[] args) throws IOException, TimeoutException {
    //创建连接对象
    Connection connection = RabbitmqConn.getConnection();
    //创建通道连接
    Channel channel = connection.createChannel();
    //连接的交换机
    channel.exchangeDeclare("hell_direct","direct");
    //临时队列
    String queue = channel.queueDeclare().getQueue();
    //交换机与临时队列连接
    channel.queueBind(queue,"hell_direct","error");
    //消费消息
    channel.basicConsume(queue,true,new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("消费者1--->"+new String(body));
        }
    });
}

消费者2

public static void main(String[] args) throws IOException, TimeoutException {
    //创建连接对象
    Connection connection = RabbitmqConn.getConnection();
    //创建通道连接
    Channel channel = connection.createChannel();
    //连接的交换机
    channel.exchangeDeclare("hell_direct","direct");
    //临时队列
    String queue = channel.queueDeclare().getQueue();
    //交换机与临时队列连接
    channel.queueBind(queue,"hell_direct","error");
    channel.queueBind(queue,"hell_direct","info");
    channel.queueBind(queue,"hell_direct","warning");
    //消费消息
    channel.basicConsume(queue,true,new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("消费者 2--->"+new String(body));
        }
    });
}

 

输出结果:

都可以消费者1和消费者2都可以消费消息,因为生产者rountingkey设置为error,

如果生产者rountingkey设置为infor,那么消费者1不能消费消息,而消费者可以消费消息

5.topic 模型

Topic类型的Exchange与Direct相比,都是可以根据Routingkey把消息路由到不同的队列,
只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配特“*”和“*”
这种模型Routingkey一般都是由一个或多个单词组成,多个单词之间以"."分割,例如: stes.insert

 *(星号)只能代替一个单词。

#(井号)可以替换零个或多个单词。

RabbitMQ 基础知识总结

生产者

public static void main(String[] args) throws IOException, TimeoutException {
    //创建连接对象
    Connection connection = RabbitmqConn.getConnection();
    //创建连接通道
    Channel channel = connection.createChannel();
    //交换机
    channel.exchangeDeclare("topics","topic");
    //发送消息
    String routingkey="user.save.hello";
    channel.basicPublish("topics",routingkey,null,"这是topic".getBytes());
    channel.close();
    connection.close();

}

消费者1

public static void main(String[] args) throws IOException, TimeoutException {
    Connection connection = RabbitmqConn.getConnection();
    //创建通道连接
    Channel channel = connection.createChannel();
    //连接的交换机
    channel.exchangeDeclare("topics","topic");
    //临时队列
    String queue = channel.queueDeclare().getQueue();
    //交换机与临时队列连接
    channel.queueBind(queue,"topics","user.*");
    //消费消息
    channel.basicConsume(queue,true,new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("topic--->"+new String(body));
        }
    });
}

消费者2

public static void main(String[] args) throws IOException, TimeoutException {
    Connection connection = RabbitmqConn.getConnection();
    //创建通道连接
    Channel channel = connection.createChannel();
    //连接的交换机
    channel.exchangeDeclare("topics","topic");
    //临时队列
    String queue = channel.queueDeclare().getQueue();
    //交换机与临时队列连接
    channel.queueBind(queue,"topics","user.#");
    //消费消息
    channel.basicConsume(queue,true,new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws
                IOException {
            System.out.println("topic--->"+new String(body));
        }
    });
}

 

输出结果:

消费者1 不能消费消息(因为*只能代替一个单词),而消费者2可以消费消息(因为#可以替换零个或多个单词)

本文地址:https://blog.csdn.net/weixin_44544678/article/details/109578831

相关标签: RabbitMQ