欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

java远程连接调用Rabbitmq的实例代码

程序员文章站 2023-08-16 08:30:16
本文介绍了java远程连接调用rabbitmq,分享给大家,希望此文章对各位有所帮助。 打开idea创建一个maven工程(java就可以了)。  ...

本文介绍了java远程连接调用rabbitmq,分享给大家,希望此文章对各位有所帮助。

打开idea创建一个maven工程(java就可以了)。

java远程连接调用Rabbitmq的实例代码 

pom.xml文件如下

<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
 xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelversion>4.0.0</modelversion>

 <groupid>com.zhenqi</groupid>
 <artifactid>rabbitmq-study</artifactid>
 <version>1.0-snapshot</version>
 <packaging>jar</packaging>

 <name>rabbitmq-study</name>
 <url>http://maven.apache.org</url>

 <properties>
  <project.build.sourceencoding>utf-8</project.build.sourceencoding>
 </properties>

 <dependencies>
  <dependency>
   <groupid>junit</groupid>
   <artifactid>junit</artifactid>
   <version>4.12</version>
   <scope>test</scope>
  </dependency>

  <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
  <dependency>
   <groupid>com.rabbitmq</groupid>
   <artifactid>amqp-client</artifactid>
   <version>4.1.0</version>
   <exclusions>
    <exclusion>
     <groupid>org.slf4j</groupid>
     <artifactid>slf4j-api</artifactid>
    </exclusion>
   </exclusions>
  </dependency>

  <dependency>
   <groupid>org.slf4j</groupid>
   <artifactid>slf4j-log4j12</artifactid>
   <version>1.7.21</version>
  </dependency>

  <dependency>
   <groupid>commons-lang</groupid>
   <artifactid>commons-lang</artifactid>
   <version>2.6</version>
  </dependency>

 </dependencies>
</project>

为了能远程访问rabbitmq,则需要编辑 /etc/rabbitmq/rabbitmq.conf,添加以下内容。

[
  {rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["asdf"]}]}
 ]

添加administrator角色

rabbitmqctl set_user_tags openstack administrator

创建抽象队列 endpoint.java

package com.zhenqi;

import com.rabbitmq.client.channel;
import com.rabbitmq.client.connection;
import com.rabbitmq.client.connectionfactory;


/**
 * created by wuming on 2017/7/16.
 */
public abstract class endpoint {

  protected channel channel;
  protected connection connection;
  protected string endpointname;

  public endpoint(string endpointname) throws exception {

    this.endpointname = endpointname;

    //创建一个连接工厂 connection factory
    connectionfactory factory = new connectionfactory();

    //设置rabbitmq-server服务ip地址
    factory.sethost("192.168.146.128");
    factory.setusername("openstack");
    factory.setpassword("rabbitmq");
    factory.setport(5672);
    factory.setvirtualhost("/");

    //得到 连接
    connection = factory.newconnection();

    //创建 channel实例
    channel = connection.createchannel();

    channel.queuedeclare(endpointname, false, false, false, null);
  }

  /**
   * 关闭channel和connection。并非必须,因为隐含是自动调用的。
   * @throws ioexception
   */
  public void close() throws exception{
    this.channel.close();
    this.connection.close();
  }
}

生产者producer.java

生产者类的任务是向队列里写一条消息

package com.zhenqi;

import org.apache.commons.lang.serializationutils;

import java.io.serializable;

/**
 * created by wuming on 2017/7/16.
 */
public class producer extends endpoint {

  public producer(string endpointname) throws exception {
    super(endpointname);
  }

  public void sendmessage(serializable object) throws exception {
    channel.basicpublish("",endpointname, null, serializationutils.serialize(object));
  }
}

消费者queueconsumer.java

消费者可以以线程方式运行,对于不同的事件有不同的回调函数,其中最主要的是处理新消息到来的事件。

package com.zhenqi;

import com.rabbitmq.client.amqp;
import com.rabbitmq.client.consumer;
import com.rabbitmq.client.envelope;
import com.rabbitmq.client.shutdownsignalexception;
import org.apache.commons.lang.serializationutils;
import org.apache.log4j.logger;

import java.io.ioexception;
import java.util.hashmap;
import java.util.map;

/**
 * created by wuming on 2017/7/16.
 */
public class queueconsumer extends endpoint implements runnable, consumer {

  private logger log=logger.getlogger(queueconsumer.class);

  public queueconsumer(string endpointname) throws exception {
    super(endpointname);
  }

  public void handleconsumeok(string s) {

  }

  public void handlecancelok(string s) {

  }

  public void handlecancel(string s) throws ioexception {

  }

  public void handleshutdownsignal(string s, shutdownsignalexception e) {

  }

  public void handlerecoverok(string s) {
    log.info("consumer "+s +" registered");
  }

  public void handledelivery(string s, envelope envelope, amqp.basicproperties basicproperties, byte[] bytes) throws ioexception {
    map map = (hashmap) serializationutils.deserialize(bytes);
    log.info("message number "+ map.get("message number") + " received.");
  }

  public void run() {
    try{
      channel.basicconsume(endpointname, true,this);
    }catch(ioexception e){
      e.printstacktrace();
    }
  }
}

 测试

运行一个消费者线程,然后开始产生大量的消息,这些消息会被消费者取走

package com.zhenqi;

import java.util.hashmap;

/**
 * created by wuming on 2017/7/16.
 */
public class testrabbitmq {

  public static void main(string[] args){
    try{
      queueconsumer consumer = new queueconsumer("queue");
      thread consumerthread = new thread(consumer);
      consumerthread.start();

      producer producer = new producer("queue");

      for (int i = 0; i < 100000; i++){
        hashmap message = new hashmap();
        message.put("message number", i);
        producer.sendmessage(message);
        system.out.println("message number "+ i +" sent.");
      }
    }catch(exception e){
      e.printstacktrace();
    }

  }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。