====== spring整合rabbitmq ====== ===== 官网 ===== [[http://www.rabbitmq.com/getstarted.html|官网]] ===== 三种模式 ===== * direct模式 {{:分享:技术:jms:direct.jpeg?300|}} * fanout模式(发布/订阅模式) {{:分享:技术:jms:fanout.jpeg?300|}} * topic模式(模糊匹配) {{:分享:技术:jms:topic.jpeg?500|}} ===== server端 ===== ==== pom.xml ==== org.springframework.amqp spring-rabbit 1.3.5.RELEASE ==== application-mq-rabbit.xml ==== ==== RabbitmqController.java ==== package com.gxx.dubbo.server.web; import java.util.HashMap; import java.util.Map; import javax.servlet.http.HttpServletRequest; import org.apache.log4j.Logger; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import com.alibaba.fastjson.JSON; /** * rabbitmq控制类 * @author Gxx */ @Controller @RequestMapping("/rabbitmq/") public class RabbitmqController { /** * 日志处理器 */ private final Logger logger = Logger.getLogger(RabbitmqController.class); /** * direct mq执行器 */ @Autowired private AmqpTemplate directTemplate; /** * fanout mq执行器 */ @Autowired private AmqpTemplate fanoutTemplate; /** * topic mq执行器 */ @Autowired private AmqpTemplate topicTemplate; /** * 发送direct模式mq * @return */ @RequestMapping(value = "/sendDirect") public @ResponseBody String sendDirect(HttpServletRequest request) { logger.info("发送direct模式mq开始~"); for(int i=0;i<5;i++) { logger.info("发送第[" + (i+1) + "]条mq!"); /** * 组织map */ Map map = new HashMap<>(); map.put("index", i+1); map.put("name", request.getParameter("name") + (i+1)); /** * 转json串 */ String jsonString = JSON.toJSONString(map); /** * 发送mq */ directTemplate.convertAndSend("server.to.client.direct.key", jsonString); } logger.info("发送direct模式mq结束~"); return "发送direct模式mq结束~"; } /** * 发送fanout模式(发布/订阅模式)mq * @return */ @RequestMapping(value = "/sendFanout") public @ResponseBody String sendFanout(HttpServletRequest request) { logger.info("发送fanout模式(发布/订阅模式)mq开始~"); for(int i=0;i<5;i++) { logger.info("发送第[" + (i+1) + "]条FanoutMq!"); /** * 组织map */ Map map = new HashMap<>(); map.put("index", i+1); map.put("name", request.getParameter("name") + (i+1)); /** * 转json串 */ String jsonString = JSON.toJSONString(map); /** * 发送mq */ fanoutTemplate.convertAndSend(jsonString); } logger.info("发送fanout模式(发布/订阅模式)mq结束~"); return "发送fanout模式(发布/订阅模式)mq结束~"; } /** * 发送topic模式(模糊匹配)mq * @return */ @RequestMapping(value = "/sendTopic") public @ResponseBody String sendTopic(HttpServletRequest request) { logger.info("发送topic模式(模糊匹配)mq开始~"); for(int i=0;i<5;i++) { logger.info("发送第[" + (i+1) + "]条TopicMq!"); /** * 组织map */ Map map = new HashMap<>(); map.put("index", i+1); map.put("name", request.getParameter("name") + (i+1)); /** * 转json串 */ String jsonString = JSON.toJSONString(map); /** * 发送mq */ topicTemplate.convertAndSend("a.b.c.topic", jsonString); } logger.info("发送topic模式(模糊匹配)mq结束~"); return "发送topic模式(模糊匹配)mq结束~"; } } ===== client端 ===== ==== pom.xml ==== org.springframework.amqp spring-rabbit 1.3.5.RELEASE ==== application-mq-rabbit.xml ==== ==== ServerToClientDirectListener1.java ==== package com.gxx.dubbo.client.biz.impl; import org.apache.log4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.stereotype.Service; /** * direct模式queue监听者 * @author Gxx */ @Service(value = "serverToClientDirectListener1") public class ServerToClientDirectListener1 implements MessageListener { /** * 日志处理器 */ private final Logger logger = Logger.getLogger(ServerToClientDirectListener1.class); @Override public void onMessage(Message message) { logger.info("direct模式queue监听者[1]收到MQ内容:" + message); } } ==== ServerToClientDirectListener2.java ==== package com.gxx.dubbo.client.biz.impl; import org.apache.log4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.stereotype.Service; /** * direct模式queue监听者 * @author Gxx */ @Service(value = "serverToClientDirectListener2") public class ServerToClientDirectListener2 implements MessageListener { /** * 日志处理器 */ private final Logger logger = Logger.getLogger(ServerToClientDirectListener2.class); @Override public void onMessage(Message message) { logger.info("direct模式queue监听者[2]收到MQ内容:" + message); } } ==== ServerToClientFanoutListener1.java ==== package com.gxx.dubbo.client.biz.impl; import org.apache.log4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.stereotype.Service; /** * fanout模式(发布/订阅模式)queue监听者 * @author Gxx */ @Service(value = "serverToClientFanoutListener1") public class ServerToClientFanoutListener1 implements MessageListener { /** * 日志处理器 */ private final Logger logger = Logger.getLogger(ServerToClientFanoutListener1.class); @Override public void onMessage(Message message) { logger.info("fanout模式(发布/订阅模式)queue监听者[1]收到MQ内容:" + message); } } ==== ServerToClientFanoutListener2.java ==== package com.gxx.dubbo.client.biz.impl; import org.apache.log4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.stereotype.Service; /** * fanout模式(发布/订阅模式)queue监听者 * @author Gxx */ @Service(value = "serverToClientFanoutListener2") public class ServerToClientFanoutListener2 implements MessageListener { /** * 日志处理器 */ private final Logger logger = Logger.getLogger(ServerToClientFanoutListener2.class); @Override public void onMessage(Message message) { logger.info("fanout模式(发布/订阅模式)queue监听者[2]收到MQ内容:" + message); } } ==== ServerToClientTopicListener1.java ==== package com.gxx.dubbo.client.biz.impl; import org.apache.log4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.stereotype.Service; /** * topic模式(模糊匹配)queue监听者 * @author Gxx */ @Service(value = "serverToClientTopicListener1") public class ServerToClientTopicListener1 implements MessageListener { /** * 日志处理器 */ private final Logger logger = Logger.getLogger(ServerToClientTopicListener1.class); @Override public void onMessage(Message message) { logger.info("topic模式(模糊匹配)queue监听者[1]收到MQ内容:" + message); } } ==== ServerToClientTopicListener2.java ==== package com.gxx.dubbo.client.biz.impl; import org.apache.log4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.stereotype.Service; /** * topic模式(模糊匹配)queue监听者 * @author Gxx */ @Service(value = "serverToClientTopicListener2") public class ServerToClientTopicListener2 implements MessageListener { /** * 日志处理器 */ private final Logger logger = Logger.getLogger(ServerToClientTopicListener2.class); @Override public void onMessage(Message message) { logger.info("topic模式(模糊匹配)queue监听者[2]收到MQ内容:" + message); } } ===== 部署测试 ===== 部署server-web和client-web到tomcat ==== 测试direct模式 ==== 访问 http://localhost:8080/server-web/rabbitmq/sendDirect.htm?name=gxx 观察server.log 2017-06-01 10:48:13,177 INFO 发送direct模式mq开始~ 2017-06-01 10:48:13,179 INFO 发送第[1]条mq! 2017-06-01 10:48:13,866 INFO 发送第[2]条mq! 2017-06-01 10:48:13,866 INFO 发送第[3]条mq! 2017-06-01 10:48:13,866 INFO 发送第[4]条mq! 2017-06-01 10:48:13,867 INFO 发送第[5]条mq! 2017-06-01 10:48:13,867 INFO 发送direct模式mq结束~ 观察client.log 2017-06-01 10:48:15,593 INFO direct模式queue监听者[2]收到MQ内容:(Body:'{"index":2,"name":"gxx2"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=direct.exchange, receivedRoutingKey=server.to.client.direct.key, deliveryTag=1, messageCount=0]) 2017-06-01 10:48:15,613 INFO direct模式queue监听者[1]收到MQ内容:(Body:'{"index":1,"name":"gxx1"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=direct.exchange, receivedRoutingKey=server.to.client.direct.key, deliveryTag=1, messageCount=0]) 2017-06-01 10:48:15,667 INFO direct模式queue监听者[2]收到MQ内容:(Body:'{"index":3,"name":"gxx3"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=direct.exchange, receivedRoutingKey=server.to.client.direct.key, deliveryTag=2, messageCount=0]) 2017-06-01 10:48:15,699 INFO direct模式queue监听者[1]收到MQ内容:(Body:'{"index":4,"name":"gxx4"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=direct.exchange, receivedRoutingKey=server.to.client.direct.key, deliveryTag=2, messageCount=0]) 2017-06-01 10:48:15,731 INFO direct模式queue监听者[2]收到MQ内容:(Body:'{"index":5,"name":"gxx5"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=direct.exchange, receivedRoutingKey=server.to.client.direct.key, deliveryTag=3, messageCount=0]) ==== 测试fanout模式(发布/订阅模式) ==== 访问 http://localhost:8080/server-web/rabbitmq/sendFanout.htm?name=gxx 观察server.log 2017-06-01 10:49:13,634 INFO 发送fanout模式(发布/订阅模式)mq开始~ 2017-06-01 10:49:13,634 INFO 发送第[1]条FanoutMq! 2017-06-01 10:49:13,635 INFO 发送第[2]条FanoutMq! 2017-06-01 10:49:13,636 INFO 发送第[3]条FanoutMq! 2017-06-01 10:49:13,636 INFO 发送第[4]条FanoutMq! 2017-06-01 10:49:13,637 INFO 发送第[5]条FanoutMq! 2017-06-01 10:49:13,637 INFO 发送fanout模式(发布/订阅模式)mq结束~ 观察client.log 2017-06-01 10:49:13,671 INFO fanout模式(发布/订阅模式)queue监听者[1]收到MQ内容:(Body:'{"index":1,"name":"gxx1"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=1, messageCount=0]) 2017-06-01 10:49:13,673 INFO fanout模式(发布/订阅模式)queue监听者[2]收到MQ内容:(Body:'{"index":1,"name":"gxx1"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=1, messageCount=0]) 2017-06-01 10:49:13,736 INFO fanout模式(发布/订阅模式)queue监听者[2]收到MQ内容:(Body:'{"index":2,"name":"gxx2"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=2, messageCount=0]) 2017-06-01 10:49:13,738 INFO fanout模式(发布/订阅模式)queue监听者[1]收到MQ内容:(Body:'{"index":2,"name":"gxx2"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=2, messageCount=0]) 2017-06-01 10:49:13,810 INFO fanout模式(发布/订阅模式)queue监听者[1]收到MQ内容:(Body:'{"index":3,"name":"gxx3"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=3, messageCount=0]) 2017-06-01 10:49:13,810 INFO fanout模式(发布/订阅模式)queue监听者[2]收到MQ内容:(Body:'{"index":3,"name":"gxx3"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=3, messageCount=0]) 2017-06-01 10:49:13,872 INFO fanout模式(发布/订阅模式)queue监听者[2]收到MQ内容:(Body:'{"index":4,"name":"gxx4"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=4, messageCount=0]) 2017-06-01 10:49:13,874 INFO fanout模式(发布/订阅模式)queue监听者[1]收到MQ内容:(Body:'{"index":4,"name":"gxx4"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=4, messageCount=0]) 2017-06-01 10:49:13,941 INFO fanout模式(发布/订阅模式)queue监听者[2]收到MQ内容:(Body:'{"index":5,"name":"gxx5"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=5, messageCount=0]) 2017-06-01 10:49:13,943 INFO fanout模式(发布/订阅模式)queue监听者[1]收到MQ内容:(Body:'{"index":5,"name":"gxx5"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=5, messageCount=0]) ==== 测试topic模式(模糊匹配) ==== 访问 http://localhost:8080/server-web/rabbitmq/sendTopic.htm?name=gxx 观察server.log 2017-06-01 10:50:10,863 INFO 发送topic模式(模糊匹配)mq开始~ 2017-06-01 10:50:10,863 INFO 发送第[1]条TopicMq! 2017-06-01 10:50:10,864 INFO 发送第[2]条TopicMq! 2017-06-01 10:50:10,864 INFO 发送第[3]条TopicMq! 2017-06-01 10:50:10,864 INFO 发送第[4]条TopicMq! 2017-06-01 10:50:10,865 INFO 发送第[5]条TopicMq! 2017-06-01 10:50:10,865 INFO 发送topic模式(模糊匹配)mq结束~ 观察client.log 2017-06-01 10:50:10,903 INFO topic模式(模糊匹配)queue监听者[2]收到MQ内容:(Body:'{"index":2,"name":"gxx2"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=topic.exchange, receivedRoutingKey=a.b.c.topic, deliveryTag=1, messageCount=0]) 2017-06-01 10:50:10,904 INFO topic模式(模糊匹配)queue监听者[1]收到MQ内容:(Body:'{"index":1,"name":"gxx1"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=topic.exchange, receivedRoutingKey=a.b.c.topic, deliveryTag=1, messageCount=0]) 2017-06-01 10:50:10,995 INFO topic模式(模糊匹配)queue监听者[1]收到MQ内容:(Body:'{"index":3,"name":"gxx3"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=topic.exchange, receivedRoutingKey=a.b.c.topic, deliveryTag=2, messageCount=0]) 2017-06-01 10:50:10,995 INFO topic模式(模糊匹配)queue监听者[2]收到MQ内容:(Body:'{"index":4,"name":"gxx4"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=topic.exchange, receivedRoutingKey=a.b.c.topic, deliveryTag=2, messageCount=0]) 2017-06-01 10:50:11,277 INFO topic模式(模糊匹配)queue监听者[2]收到MQ内容:(Body:'{"index":5,"name":"gxx5"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=topic.exchange, receivedRoutingKey=a.b.c.topic, deliveryTag=3, messageCount=0]) ===== rabbitmq控台观察 ===== {{:分享:技术:jms:rabbitmq-1.jpeg?800|}} {{:分享:技术:jms:rabbitmq-2.jpeg?800|}} {{:分享:技术:jms:rabbitmq-3.jpeg?800|}} ===== 附源码 ===== {{:分享:技术:jms:server.zip|}} {{:分享:技术:jms:client.zip|}} ===== 几个属性的解释 ===== About queue: 1 queue-name The queue name MAY be empty, in which case the server MUST create a new queue with a unique generated name and return this to the client in the Declare-Ok method. 2 passive If set, the server will reply with Declare-Ok if the queue already exists with the same name, and raise an error if not. 3 durable If set when creating a new queue, the queue will be marked as durable. Durable queues remain active when a server restarts. Non-durable queues (transient queues) are purged if/when a server restarts. Note that durable queues do not necessarily hold persistent messages, although it does not make sense to send persistent messages to a transient queue. The server MUST recreate the durable queue after a restart. The server MUST support both durable and transient queues. 4 exclusive Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Passive declaration of an exclusive queue by other connections are not allowed. The server MUST support both exclusive (private) and non-exclusive (shared) queues. The client MAY NOT attempt to use a queue that was declared as exclusive by another still-open connection. Error code: resource-locked 5 auto-delete If set, the queue is deleted when all consumers have finished using it. The last consumer can be cancelled either explicitly or because its channel is closed. If there was no consumer ever on the queue, it won't be deleted. Applications can explicitly delete auto-delete queues using the Delete method as normal. The server MUST ignore the auto-delete field if the queue already exists. About exchange: 1 passive If set, the server will reply with Declare-Ok if the exchange already exists with the same name, and raise an error if not. 2 durable If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged if/when a server restarts. 3 auto-delete If set, the exchange is deleted when all queues have finished using it. The server SHOULD allow for a reasonable delay between the point when it determines that an exchange is not being used (or no longer used), and the point when it deletes the exchange. At the least it must allow a client to create an exchange and then bind a queue to it, with a small but non-zero delay between these two actions. The server MUST ignore the auto-delete field if the exchange already exists. 4 internal If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications.