跳至内容
wiki
用户工具
登录
站点工具
工具
显示页面
修订记录
反向链接
最近更改
媒体管理器
网站地图
登录
最近更改
媒体管理器
网站地图
您的足迹:
分享:技术:jms:spring整合rabbitmq
本页面只读。您可以查看源文件,但不能更改它。如果您觉得这是系统错误,请联系管理员。
====== spring整合rabbitmq ====== ===== 官网 ===== [[http://www.rabbitmq.com/getstarted.html|官网]] ===== 三种模式 ===== * direct模式 {{:分享:技术:jms:direct.jpeg?300|}} * fanout模式(发布/订阅模式) {{:分享:技术:jms:fanout.jpeg?300|}} * topic模式(模糊匹配) {{:分享:技术:jms:topic.jpeg?500|}} ===== server端 ===== ==== pom.xml ==== <code xml pom.xml> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.3.5.RELEASE</version> </dependency> </code> ==== application-mq-rabbit.xml ==== <code xml application-mq-rabbit.xml> <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:task="http://www.springframework.org/schema/task" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/integration/ip http://www.springframework.org/schema/integration/ip/spring-integration-ip-2.0.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <!-- mq连接服务配置 --> <rabbit:connection-factory id="connectionFactory" host="121.40.171.96" username="admin" password="admin123" port="5672" /> <rabbit:admin connection-factory="connectionFactory" /> <!-- direct模式开始 --> <!-- 定义template --> <rabbit:template id="directTemplate" connection-factory="connectionFactory" exchange="direct.exchange" /> <!-- 定义queue --> <rabbit:queue name="server.to.client.direct" durable="true" auto-delete="false" exclusive="false" /> <!-- 定义exchange --> <rabbit:direct-exchange name="direct.exchange" durable="true" auto-delete="false"> <rabbit:bindings> <!-- 定义binding --> <rabbit:binding queue="server.to.client.direct" key="server.to.client.direct.key" /> </rabbit:bindings> </rabbit:direct-exchange> <!-- direct模式结束 --> <!-- fanout模式(发布/订阅模式)开始 --> <!-- 定义template --> <rabbit:template id="fanoutTemplate" connection-factory="connectionFactory" exchange="fanout.exchange" /> <!-- 定义queue --> <rabbit:queue name="server.to.client.fanout1" durable="true" /> <rabbit:queue name="server.to.client.fanout2" durable="true" /> <!-- 定义exchange --> <rabbit:fanout-exchange name="fanout.exchange" xmlns="http://www.springframework.org/schema/rabbit" durable="true"> <rabbit:bindings> <!-- 定义binding --> <rabbit:binding queue="server.to.client.fanout1"></rabbit:binding> <rabbit:binding queue="server.to.client.fanout2"></rabbit:binding> </rabbit:bindings> </rabbit:fanout-exchange> <!-- fanout模式(发布/订阅模式)结束 --> <!-- topic模式(模糊匹配)开始 --> <!-- 定义template --> <rabbit:template id="topicTemplate" connection-factory="connectionFactory" exchange="topic.exchange" /> <!-- 定义queue --> <rabbit:queue name="server.to.client.topic" durable="true" /> <!-- 定义exchange --> <rabbit:topic-exchange name="topic.exchange" xmlns="http://www.springframework.org/schema/rabbit" durable="true"> <bindings> <!-- 定义binding --> <binding queue="server.to.client.topic" pattern="*.*.*.topic" /> </bindings> </rabbit:topic-exchange> <!-- topic模式(模糊匹配)结束 --> </beans> </code> ==== RabbitmqController.java ==== <code java RabbitmqController.java> package com.gxx.dubbo.server.web; import java.util.HashMap; import java.util.Map; import javax.servlet.http.HttpServletRequest; import org.apache.log4j.Logger; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import com.alibaba.fastjson.JSON; /** * rabbitmq控制类 * @author Gxx */ @Controller @RequestMapping("/rabbitmq/") public class RabbitmqController { /** * 日志处理器 */ private final Logger logger = Logger.getLogger(RabbitmqController.class); /** * direct mq执行器 */ @Autowired private AmqpTemplate directTemplate; /** * fanout mq执行器 */ @Autowired private AmqpTemplate fanoutTemplate; /** * topic mq执行器 */ @Autowired private AmqpTemplate topicTemplate; /** * 发送direct模式mq * @return */ @RequestMapping(value = "/sendDirect") public @ResponseBody String sendDirect(HttpServletRequest request) { logger.info("发送direct模式mq开始~"); for(int i=0;i<5;i++) { logger.info("发送第[" + (i+1) + "]条mq!"); /** * 组织map */ Map<String, Object> map = new HashMap<>(); map.put("index", i+1); map.put("name", request.getParameter("name") + (i+1)); /** * 转json串 */ String jsonString = JSON.toJSONString(map); /** * 发送mq */ directTemplate.convertAndSend("server.to.client.direct.key", jsonString); } logger.info("发送direct模式mq结束~"); return "发送direct模式mq结束~"; } /** * 发送fanout模式(发布/订阅模式)mq * @return */ @RequestMapping(value = "/sendFanout") public @ResponseBody String sendFanout(HttpServletRequest request) { logger.info("发送fanout模式(发布/订阅模式)mq开始~"); for(int i=0;i<5;i++) { logger.info("发送第[" + (i+1) + "]条FanoutMq!"); /** * 组织map */ Map<String, Object> map = new HashMap<>(); map.put("index", i+1); map.put("name", request.getParameter("name") + (i+1)); /** * 转json串 */ String jsonString = JSON.toJSONString(map); /** * 发送mq */ fanoutTemplate.convertAndSend(jsonString); } logger.info("发送fanout模式(发布/订阅模式)mq结束~"); return "发送fanout模式(发布/订阅模式)mq结束~"; } /** * 发送topic模式(模糊匹配)mq * @return */ @RequestMapping(value = "/sendTopic") public @ResponseBody String sendTopic(HttpServletRequest request) { logger.info("发送topic模式(模糊匹配)mq开始~"); for(int i=0;i<5;i++) { logger.info("发送第[" + (i+1) + "]条TopicMq!"); /** * 组织map */ Map<String, Object> map = new HashMap<>(); map.put("index", i+1); map.put("name", request.getParameter("name") + (i+1)); /** * 转json串 */ String jsonString = JSON.toJSONString(map); /** * 发送mq */ topicTemplate.convertAndSend("a.b.c.topic", jsonString); } logger.info("发送topic模式(模糊匹配)mq结束~"); return "发送topic模式(模糊匹配)mq结束~"; } } </code> ===== client端 ===== ==== pom.xml ==== <code xml pom.xml> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.3.5.RELEASE</version> </dependency> </code> ==== application-mq-rabbit.xml ==== <code xml application-mq-rabbit.xml> <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:task="http://www.springframework.org/schema/task" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/integration/ip http://www.springframework.org/schema/integration/ip/spring-integration-ip-2.0.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <!-- mq连接服务配置 --> <rabbit:connection-factory id="connectionFactory" host="121.40.171.96" username="admin" password="admin123" port="5672" /> <rabbit:admin connection-factory="connectionFactory" /> <!-- direct模式queue --> <rabbit:queue name="server.to.client.direct" durable="true" auto-delete="false" exclusive="false" /> <!-- fanout模式(发布/订阅模式)queue --> <rabbit:queue name="server.to.client.fanout1" durable="true" /> <rabbit:queue name="server.to.client.fanout2" durable="true" /> <!-- topic模式(模糊匹配)queue --> <rabbit:queue name="server.to.client.topic" durable="true" /> <!-- 配置mq监听者 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"> <!-- direct模式queue监听者 --> <rabbit:listener queues="server.to.client.direct" ref="serverToClientDirectListener1" /> <rabbit:listener queues="server.to.client.direct" ref="serverToClientDirectListener2" /> <!-- fanout模式(发布/订阅模式)queue监听者 --> <rabbit:listener queues="server.to.client.fanout1" ref="serverToClientFanoutListener1" /> <rabbit:listener queues="server.to.client.fanout2" ref="serverToClientFanoutListener2" /> <!-- topic模式(模糊匹配)queue监听者 --> <rabbit:listener queues="server.to.client.topic" ref="serverToClientTopicListener1" /> <rabbit:listener queues="server.to.client.topic" ref="serverToClientTopicListener2" /> </rabbit:listener-container> </beans> </code> ==== ServerToClientDirectListener1.java ==== <code java ServerToClientDirectListener1.java> package com.gxx.dubbo.client.biz.impl; import org.apache.log4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.stereotype.Service; /** * direct模式queue监听者 * @author Gxx */ @Service(value = "serverToClientDirectListener1") public class ServerToClientDirectListener1 implements MessageListener { /** * 日志处理器 */ private final Logger logger = Logger.getLogger(ServerToClientDirectListener1.class); @Override public void onMessage(Message message) { logger.info("direct模式queue监听者[1]收到MQ内容:" + message); } } </code> ==== ServerToClientDirectListener2.java ==== <code java ServerToClientDirectListener2.java> package com.gxx.dubbo.client.biz.impl; import org.apache.log4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.stereotype.Service; /** * direct模式queue监听者 * @author Gxx */ @Service(value = "serverToClientDirectListener2") public class ServerToClientDirectListener2 implements MessageListener { /** * 日志处理器 */ private final Logger logger = Logger.getLogger(ServerToClientDirectListener2.class); @Override public void onMessage(Message message) { logger.info("direct模式queue监听者[2]收到MQ内容:" + message); } } </code> ==== ServerToClientFanoutListener1.java ==== <code java ServerToClientFanoutListener1.java> package com.gxx.dubbo.client.biz.impl; import org.apache.log4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.stereotype.Service; /** * fanout模式(发布/订阅模式)queue监听者 * @author Gxx */ @Service(value = "serverToClientFanoutListener1") public class ServerToClientFanoutListener1 implements MessageListener { /** * 日志处理器 */ private final Logger logger = Logger.getLogger(ServerToClientFanoutListener1.class); @Override public void onMessage(Message message) { logger.info("fanout模式(发布/订阅模式)queue监听者[1]收到MQ内容:" + message); } } </code> ==== ServerToClientFanoutListener2.java ==== <code java ServerToClientFanoutListener2.java> package com.gxx.dubbo.client.biz.impl; import org.apache.log4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.stereotype.Service; /** * fanout模式(发布/订阅模式)queue监听者 * @author Gxx */ @Service(value = "serverToClientFanoutListener2") public class ServerToClientFanoutListener2 implements MessageListener { /** * 日志处理器 */ private final Logger logger = Logger.getLogger(ServerToClientFanoutListener2.class); @Override public void onMessage(Message message) { logger.info("fanout模式(发布/订阅模式)queue监听者[2]收到MQ内容:" + message); } } </code> ==== ServerToClientTopicListener1.java ==== <code java ServerToClientTopicListener1.java> package com.gxx.dubbo.client.biz.impl; import org.apache.log4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.stereotype.Service; /** * topic模式(模糊匹配)queue监听者 * @author Gxx */ @Service(value = "serverToClientTopicListener1") public class ServerToClientTopicListener1 implements MessageListener { /** * 日志处理器 */ private final Logger logger = Logger.getLogger(ServerToClientTopicListener1.class); @Override public void onMessage(Message message) { logger.info("topic模式(模糊匹配)queue监听者[1]收到MQ内容:" + message); } } </code> ==== ServerToClientTopicListener2.java ==== <code java ServerToClientTopicListener2.java> package com.gxx.dubbo.client.biz.impl; import org.apache.log4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.stereotype.Service; /** * topic模式(模糊匹配)queue监听者 * @author Gxx */ @Service(value = "serverToClientTopicListener2") public class ServerToClientTopicListener2 implements MessageListener { /** * 日志处理器 */ private final Logger logger = Logger.getLogger(ServerToClientTopicListener2.class); @Override public void onMessage(Message message) { logger.info("topic模式(模糊匹配)queue监听者[2]收到MQ内容:" + message); } } </code> ===== 部署测试 ===== 部署server-web和client-web到tomcat ==== 测试direct模式 ==== 访问 <code>http://localhost:8080/server-web/rabbitmq/sendDirect.htm?name=gxx</code> 观察server.log <code> 2017-06-01 10:48:13,177 INFO 发送direct模式mq开始~ 2017-06-01 10:48:13,179 INFO 发送第[1]条mq! 2017-06-01 10:48:13,866 INFO 发送第[2]条mq! 2017-06-01 10:48:13,866 INFO 发送第[3]条mq! 2017-06-01 10:48:13,866 INFO 发送第[4]条mq! 2017-06-01 10:48:13,867 INFO 发送第[5]条mq! 2017-06-01 10:48:13,867 INFO 发送direct模式mq结束~ </code> 观察client.log <code> 2017-06-01 10:48:15,593 INFO direct模式queue监听者[2]收到MQ内容:(Body:'{"index":2,"name":"gxx2"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=direct.exchange, receivedRoutingKey=server.to.client.direct.key, deliveryTag=1, messageCount=0]) 2017-06-01 10:48:15,613 INFO direct模式queue监听者[1]收到MQ内容:(Body:'{"index":1,"name":"gxx1"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=direct.exchange, receivedRoutingKey=server.to.client.direct.key, deliveryTag=1, messageCount=0]) 2017-06-01 10:48:15,667 INFO direct模式queue监听者[2]收到MQ内容:(Body:'{"index":3,"name":"gxx3"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=direct.exchange, receivedRoutingKey=server.to.client.direct.key, deliveryTag=2, messageCount=0]) 2017-06-01 10:48:15,699 INFO direct模式queue监听者[1]收到MQ内容:(Body:'{"index":4,"name":"gxx4"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=direct.exchange, receivedRoutingKey=server.to.client.direct.key, deliveryTag=2, messageCount=0]) 2017-06-01 10:48:15,731 INFO direct模式queue监听者[2]收到MQ内容:(Body:'{"index":5,"name":"gxx5"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=direct.exchange, receivedRoutingKey=server.to.client.direct.key, deliveryTag=3, messageCount=0]) </code> ==== 测试fanout模式(发布/订阅模式) ==== 访问 <code>http://localhost:8080/server-web/rabbitmq/sendFanout.htm?name=gxx</code> 观察server.log <code> 2017-06-01 10:49:13,634 INFO 发送fanout模式(发布/订阅模式)mq开始~ 2017-06-01 10:49:13,634 INFO 发送第[1]条FanoutMq! 2017-06-01 10:49:13,635 INFO 发送第[2]条FanoutMq! 2017-06-01 10:49:13,636 INFO 发送第[3]条FanoutMq! 2017-06-01 10:49:13,636 INFO 发送第[4]条FanoutMq! 2017-06-01 10:49:13,637 INFO 发送第[5]条FanoutMq! 2017-06-01 10:49:13,637 INFO 发送fanout模式(发布/订阅模式)mq结束~ </code> 观察client.log <code> 2017-06-01 10:49:13,671 INFO fanout模式(发布/订阅模式)queue监听者[1]收到MQ内容:(Body:'{"index":1,"name":"gxx1"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=1, messageCount=0]) 2017-06-01 10:49:13,673 INFO fanout模式(发布/订阅模式)queue监听者[2]收到MQ内容:(Body:'{"index":1,"name":"gxx1"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=1, messageCount=0]) 2017-06-01 10:49:13,736 INFO fanout模式(发布/订阅模式)queue监听者[2]收到MQ内容:(Body:'{"index":2,"name":"gxx2"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=2, messageCount=0]) 2017-06-01 10:49:13,738 INFO fanout模式(发布/订阅模式)queue监听者[1]收到MQ内容:(Body:'{"index":2,"name":"gxx2"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=2, messageCount=0]) 2017-06-01 10:49:13,810 INFO fanout模式(发布/订阅模式)queue监听者[1]收到MQ内容:(Body:'{"index":3,"name":"gxx3"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=3, messageCount=0]) 2017-06-01 10:49:13,810 INFO fanout模式(发布/订阅模式)queue监听者[2]收到MQ内容:(Body:'{"index":3,"name":"gxx3"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=3, messageCount=0]) 2017-06-01 10:49:13,872 INFO fanout模式(发布/订阅模式)queue监听者[2]收到MQ内容:(Body:'{"index":4,"name":"gxx4"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=4, messageCount=0]) 2017-06-01 10:49:13,874 INFO fanout模式(发布/订阅模式)queue监听者[1]收到MQ内容:(Body:'{"index":4,"name":"gxx4"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=4, messageCount=0]) 2017-06-01 10:49:13,941 INFO fanout模式(发布/订阅模式)queue监听者[2]收到MQ内容:(Body:'{"index":5,"name":"gxx5"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=5, messageCount=0]) 2017-06-01 10:49:13,943 INFO fanout模式(发布/订阅模式)queue监听者[1]收到MQ内容:(Body:'{"index":5,"name":"gxx5"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=5, messageCount=0]) </code> ==== 测试topic模式(模糊匹配) ==== 访问 <code>http://localhost:8080/server-web/rabbitmq/sendTopic.htm?name=gxx</code> 观察server.log <code> 2017-06-01 10:50:10,863 INFO 发送topic模式(模糊匹配)mq开始~ 2017-06-01 10:50:10,863 INFO 发送第[1]条TopicMq! 2017-06-01 10:50:10,864 INFO 发送第[2]条TopicMq! 2017-06-01 10:50:10,864 INFO 发送第[3]条TopicMq! 2017-06-01 10:50:10,864 INFO 发送第[4]条TopicMq! 2017-06-01 10:50:10,865 INFO 发送第[5]条TopicMq! 2017-06-01 10:50:10,865 INFO 发送topic模式(模糊匹配)mq结束~ </code> 观察client.log <code> 2017-06-01 10:50:10,903 INFO topic模式(模糊匹配)queue监听者[2]收到MQ内容:(Body:'{"index":2,"name":"gxx2"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=topic.exchange, receivedRoutingKey=a.b.c.topic, deliveryTag=1, messageCount=0]) 2017-06-01 10:50:10,904 INFO topic模式(模糊匹配)queue监听者[1]收到MQ内容:(Body:'{"index":1,"name":"gxx1"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=topic.exchange, receivedRoutingKey=a.b.c.topic, deliveryTag=1, messageCount=0]) 2017-06-01 10:50:10,995 INFO topic模式(模糊匹配)queue监听者[1]收到MQ内容:(Body:'{"index":3,"name":"gxx3"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=topic.exchange, receivedRoutingKey=a.b.c.topic, deliveryTag=2, messageCount=0]) 2017-06-01 10:50:10,995 INFO topic模式(模糊匹配)queue监听者[2]收到MQ内容:(Body:'{"index":4,"name":"gxx4"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=topic.exchange, receivedRoutingKey=a.b.c.topic, deliveryTag=2, messageCount=0]) 2017-06-01 10:50:11,277 INFO topic模式(模糊匹配)queue监听者[2]收到MQ内容:(Body:'{"index":5,"name":"gxx5"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=topic.exchange, receivedRoutingKey=a.b.c.topic, deliveryTag=3, messageCount=0]) </code> ===== rabbitmq控台观察 ===== {{:分享:技术:jms:rabbitmq-1.jpeg?800|}} {{:分享:技术:jms:rabbitmq-2.jpeg?800|}} {{:分享:技术:jms:rabbitmq-3.jpeg?800|}} ===== 附源码 ===== {{:分享:技术:jms:server.zip|}} {{:分享:技术:jms:client.zip|}} ===== 几个属性的解释 ===== About queue: 1 queue-name The queue name MAY be empty, in which case the server MUST create a new queue with a unique generated name and return this to the client in the Declare-Ok method. 2 passive If set, the server will reply with Declare-Ok if the queue already exists with the same name, and raise an error if not. 3 durable If set when creating a new queue, the queue will be marked as durable. Durable queues remain active when a server restarts. Non-durable queues (transient queues) are purged if/when a server restarts. Note that durable queues do not necessarily hold persistent messages, although it does not make sense to send persistent messages to a transient queue. The server MUST recreate the durable queue after a restart. The server MUST support both durable and transient queues. 4 exclusive Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Passive declaration of an exclusive queue by other connections are not allowed. The server MUST support both exclusive (private) and non-exclusive (shared) queues. The client MAY NOT attempt to use a queue that was declared as exclusive by another still-open connection. Error code: resource-locked 5 auto-delete If set, the queue is deleted when all consumers have finished using it. The last consumer can be cancelled either explicitly or because its channel is closed. If there was no consumer ever on the queue, it won't be deleted. Applications can explicitly delete auto-delete queues using the Delete method as normal. The server MUST ignore the auto-delete field if the queue already exists. About exchange: 1 passive If set, the server will reply with Declare-Ok if the exchange already exists with the same name, and raise an error if not. 2 durable If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged if/when a server restarts. 3 auto-delete If set, the exchange is deleted when all queues have finished using it. The server SHOULD allow for a reasonable delay between the point when it determines that an exchange is not being used (or no longer used), and the point when it deletes the exchange. At the least it must allow a client to create an exchange and then bind a queue to it, with a small but non-zero delay between these two actions. The server MUST ignore the auto-delete field if the exchange already exists. 4 internal If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications.
分享/技术/jms/spring整合rabbitmq.txt
· 最后更改: 2017/06/01 16:12 由
gxx
页面工具
显示页面
修订记录
反向链接
回到顶部