java远程连接调用Rabbitmq的实例代码
程序员文章站
2023-08-16 08:30:16
本文介绍了java远程连接调用rabbitmq,分享给大家,希望此文章对各位有所帮助。
打开idea创建一个maven工程(java就可以了)。
...
本文介绍了java远程连接调用rabbitmq,分享给大家,希望此文章对各位有所帮助。
打开idea创建一个maven工程(java就可以了)。
pom.xml文件如下
<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelversion>4.0.0</modelversion> <groupid>com.zhenqi</groupid> <artifactid>rabbitmq-study</artifactid> <version>1.0-snapshot</version> <packaging>jar</packaging> <name>rabbitmq-study</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceencoding>utf-8</project.build.sourceencoding> </properties> <dependencies> <dependency> <groupid>junit</groupid> <artifactid>junit</artifactid> <version>4.12</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> <dependency> <groupid>com.rabbitmq</groupid> <artifactid>amqp-client</artifactid> <version>4.1.0</version> <exclusions> <exclusion> <groupid>org.slf4j</groupid> <artifactid>slf4j-api</artifactid> </exclusion> </exclusions> </dependency> <dependency> <groupid>org.slf4j</groupid> <artifactid>slf4j-log4j12</artifactid> <version>1.7.21</version> </dependency> <dependency> <groupid>commons-lang</groupid> <artifactid>commons-lang</artifactid> <version>2.6</version> </dependency> </dependencies> </project>
为了能远程访问rabbitmq,则需要编辑 /etc/rabbitmq/rabbitmq.conf,添加以下内容。
[ {rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["asdf"]}]} ]
添加administrator角色
rabbitmqctl set_user_tags openstack administrator
创建抽象队列 endpoint.java
package com.zhenqi; import com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import com.rabbitmq.client.connectionfactory; /** * created by wuming on 2017/7/16. */ public abstract class endpoint { protected channel channel; protected connection connection; protected string endpointname; public endpoint(string endpointname) throws exception { this.endpointname = endpointname; //创建一个连接工厂 connection factory connectionfactory factory = new connectionfactory(); //设置rabbitmq-server服务ip地址 factory.sethost("192.168.146.128"); factory.setusername("openstack"); factory.setpassword("rabbitmq"); factory.setport(5672); factory.setvirtualhost("/"); //得到 连接 connection = factory.newconnection(); //创建 channel实例 channel = connection.createchannel(); channel.queuedeclare(endpointname, false, false, false, null); } /** * 关闭channel和connection。并非必须,因为隐含是自动调用的。 * @throws ioexception */ public void close() throws exception{ this.channel.close(); this.connection.close(); } }
生产者producer.java
生产者类的任务是向队列里写一条消息
package com.zhenqi; import org.apache.commons.lang.serializationutils; import java.io.serializable; /** * created by wuming on 2017/7/16. */ public class producer extends endpoint { public producer(string endpointname) throws exception { super(endpointname); } public void sendmessage(serializable object) throws exception { channel.basicpublish("",endpointname, null, serializationutils.serialize(object)); } }
消费者queueconsumer.java
消费者可以以线程方式运行,对于不同的事件有不同的回调函数,其中最主要的是处理新消息到来的事件。
package com.zhenqi; import com.rabbitmq.client.amqp; import com.rabbitmq.client.consumer; import com.rabbitmq.client.envelope; import com.rabbitmq.client.shutdownsignalexception; import org.apache.commons.lang.serializationutils; import org.apache.log4j.logger; import java.io.ioexception; import java.util.hashmap; import java.util.map; /** * created by wuming on 2017/7/16. */ public class queueconsumer extends endpoint implements runnable, consumer { private logger log=logger.getlogger(queueconsumer.class); public queueconsumer(string endpointname) throws exception { super(endpointname); } public void handleconsumeok(string s) { } public void handlecancelok(string s) { } public void handlecancel(string s) throws ioexception { } public void handleshutdownsignal(string s, shutdownsignalexception e) { } public void handlerecoverok(string s) { log.info("consumer "+s +" registered"); } public void handledelivery(string s, envelope envelope, amqp.basicproperties basicproperties, byte[] bytes) throws ioexception { map map = (hashmap) serializationutils.deserialize(bytes); log.info("message number "+ map.get("message number") + " received."); } public void run() { try{ channel.basicconsume(endpointname, true,this); }catch(ioexception e){ e.printstacktrace(); } } }
测试
运行一个消费者线程,然后开始产生大量的消息,这些消息会被消费者取走
package com.zhenqi; import java.util.hashmap; /** * created by wuming on 2017/7/16. */ public class testrabbitmq { public static void main(string[] args){ try{ queueconsumer consumer = new queueconsumer("queue"); thread consumerthread = new thread(consumer); consumerthread.start(); producer producer = new producer("queue"); for (int i = 0; i < 100000; i++){ hashmap message = new hashmap(); message.put("message number", i); producer.sendmessage(message); system.out.println("message number "+ i +" sent."); } }catch(exception e){ e.printstacktrace(); } } }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。