目录

spring整合rabbitmq

官网

官网

三种模式

server端

pom.xml

pom.xml
<dependency>
	<groupId>org.springframework.amqp</groupId>
	<artifactId>spring-rabbit</artifactId>
	<version>1.3.5.RELEASE</version>
</dependency>

application-mq-rabbit.xml

application-mq-rabbit.xml
<?xml version="1.0" encoding="UTF-8"?>
 
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:task="http://www.springframework.org/schema/task"
	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
	  http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
	  http://www.springframework.org/schema/context
	  http://www.springframework.org/schema/context/spring-context.xsd
	  http://www.springframework.org/schema/integration/ip
	  http://www.springframework.org/schema/integration/ip/spring-integration-ip-2.0.xsd
	  http://www.springframework.org/schema/integration
	  http://www.springframework.org/schema/integration/spring-integration.xsd
	  http://www.springframework.org/schema/task
	  http://www.springframework.org/schema/task/spring-task-3.0.xsd
	  http://www.springframework.org/schema/beans
	  http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
	  http://www.springframework.org/schema/rabbit
	  http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
 
	<!-- mq连接服务配置 -->
	<rabbit:connection-factory id="connectionFactory"
				host="121.40.171.96"
				username="admin"
				password="admin123"
				port="5672" />
 
	<rabbit:admin connection-factory="connectionFactory" />
 
	<!-- direct模式开始 -->
	<!-- 定义template -->
	<rabbit:template id="directTemplate" 
		connection-factory="connectionFactory"
		exchange="direct.exchange" />
 
	<!-- 定义queue -->
	<rabbit:queue name="server.to.client.direct"
		durable="true" 
		auto-delete="false" 
		exclusive="false" />
 
	<!-- 定义exchange -->
	<rabbit:direct-exchange name="direct.exchange"
		durable="true" 
		auto-delete="false">
 
		<rabbit:bindings>
			<!-- 定义binding -->
			<rabbit:binding queue="server.to.client.direct"
				key="server.to.client.direct.key" />
		</rabbit:bindings>
 
	</rabbit:direct-exchange>
	<!-- direct模式结束 -->
 
	<!-- fanout模式(发布/订阅模式)开始 -->
	<!-- 定义template -->
	<rabbit:template id="fanoutTemplate" 
		connection-factory="connectionFactory"
		exchange="fanout.exchange" />
 
	<!-- 定义queue -->
	<rabbit:queue name="server.to.client.fanout1" durable="true" />
	<rabbit:queue name="server.to.client.fanout2" durable="true" />
 
	<!-- 定义exchange -->
	<rabbit:fanout-exchange name="fanout.exchange" xmlns="http://www.springframework.org/schema/rabbit" durable="true">
		<rabbit:bindings>
			<!-- 定义binding -->
			<rabbit:binding queue="server.to.client.fanout1"></rabbit:binding> 
			<rabbit:binding queue="server.to.client.fanout2"></rabbit:binding> 
		</rabbit:bindings> 
	</rabbit:fanout-exchange>
	<!-- fanout模式(发布/订阅模式)结束 -->
 
	<!-- topic模式(模糊匹配)开始 -->
	<!-- 定义template -->
	<rabbit:template id="topicTemplate" 
		connection-factory="connectionFactory"
		exchange="topic.exchange" />
 
	<!-- 定义queue -->
	<rabbit:queue name="server.to.client.topic" durable="true" />
 
	<!-- 定义exchange -->
	<rabbit:topic-exchange name="topic.exchange" xmlns="http://www.springframework.org/schema/rabbit" durable="true">
		<bindings>
			<!-- 定义binding -->
			<binding queue="server.to.client.topic" pattern="*.*.*.topic" />
		</bindings>
	</rabbit:topic-exchange>
	<!-- topic模式(模糊匹配)结束 -->
 
</beans>

RabbitmqController.java

RabbitmqController.java
package com.gxx.dubbo.server.web;
 
import java.util.HashMap;
import java.util.Map;
 
import javax.servlet.http.HttpServletRequest;
 
import org.apache.log4j.Logger;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
 
import com.alibaba.fastjson.JSON;
 
/**
 * rabbitmq控制类
 * @author Gxx
 */
@Controller
@RequestMapping("/rabbitmq/")
public class RabbitmqController {
 
	/**
	 * 日志处理器
	 */
	private final Logger logger = Logger.getLogger(RabbitmqController.class);
 
	/**
	 * direct mq执行器
	 */
	@Autowired
	private AmqpTemplate directTemplate;
 
	/**
	 * fanout mq执行器
	 */
	@Autowired
	private AmqpTemplate fanoutTemplate;
 
	/**
	 * topic mq执行器
	 */
	@Autowired
	private AmqpTemplate topicTemplate;
 
	/**
	 * 发送direct模式mq
	 * @return
	 */
	@RequestMapping(value = "/sendDirect")
	public @ResponseBody String sendDirect(HttpServletRequest request) {
		logger.info("发送direct模式mq开始~");
		for(int i=0;i<5;i++) {
			logger.info("发送第[" + (i+1) + "]条mq!");
			/**
			 * 组织map
			 */
			Map<String, Object> map = new HashMap<>();
			map.put("index", i+1);
			map.put("name", request.getParameter("name") + (i+1));
			/**
			 * 转json串
			 */
			String jsonString  = JSON.toJSONString(map);
			/**
			 * 发送mq
			 */
			directTemplate.convertAndSend("server.to.client.direct.key", jsonString);
		}
		logger.info("发送direct模式mq结束~");
		return "发送direct模式mq结束~";
	}
 
	/**
	 * 发送fanout模式(发布/订阅模式)mq
	 * @return
	 */
	@RequestMapping(value = "/sendFanout")
	public @ResponseBody String sendFanout(HttpServletRequest request) {
		logger.info("发送fanout模式(发布/订阅模式)mq开始~");
		for(int i=0;i<5;i++) {
			logger.info("发送第[" + (i+1) + "]条FanoutMq!");
			/**
			 * 组织map
			 */
			Map<String, Object> map = new HashMap<>();
			map.put("index", i+1);
			map.put("name", request.getParameter("name") + (i+1));
			/**
			 * 转json串
			 */
			String jsonString  = JSON.toJSONString(map);
			/**
			 * 发送mq
			 */
			fanoutTemplate.convertAndSend(jsonString);
		}
		logger.info("发送fanout模式(发布/订阅模式)mq结束~");
		return "发送fanout模式(发布/订阅模式)mq结束~";
	}
 
	/**
	 * 发送topic模式(模糊匹配)mq
	 * @return
	 */
	@RequestMapping(value = "/sendTopic")
	public @ResponseBody String sendTopic(HttpServletRequest request) {
		logger.info("发送topic模式(模糊匹配)mq开始~");
		for(int i=0;i<5;i++) {
			logger.info("发送第[" + (i+1) + "]条TopicMq!");
			/**
			 * 组织map
			 */
			Map<String, Object> map = new HashMap<>();
			map.put("index", i+1);
			map.put("name", request.getParameter("name") + (i+1));
			/**
			 * 转json串
			 */
			String jsonString  = JSON.toJSONString(map);
			/**
			 * 发送mq
			 */
			topicTemplate.convertAndSend("a.b.c.topic", jsonString);
		}
		logger.info("发送topic模式(模糊匹配)mq结束~");
		return "发送topic模式(模糊匹配)mq结束~";
	}
}

client端

pom.xml

pom.xml
<dependency>
	<groupId>org.springframework.amqp</groupId>
	<artifactId>spring-rabbit</artifactId>
	<version>1.3.5.RELEASE</version>
</dependency>

application-mq-rabbit.xml

application-mq-rabbit.xml
<?xml version="1.0" encoding="UTF-8"?>
 
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:task="http://www.springframework.org/schema/task"
	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
	  http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
	  http://www.springframework.org/schema/context
	  http://www.springframework.org/schema/context/spring-context.xsd
	  http://www.springframework.org/schema/integration/ip
	  http://www.springframework.org/schema/integration/ip/spring-integration-ip-2.0.xsd
	  http://www.springframework.org/schema/integration
	  http://www.springframework.org/schema/integration/spring-integration.xsd
	  http://www.springframework.org/schema/task
	  http://www.springframework.org/schema/task/spring-task-3.0.xsd
	  http://www.springframework.org/schema/beans
	  http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
	  http://www.springframework.org/schema/rabbit
	  http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
 
	<!-- mq连接服务配置 -->
	<rabbit:connection-factory id="connectionFactory"
				host="121.40.171.96"
				username="admin"
				password="admin123"
				port="5672" />
 
	<rabbit:admin connection-factory="connectionFactory" />
 
	<!-- direct模式queue -->
	<rabbit:queue name="server.to.client.direct" durable="true" 
		auto-delete="false" exclusive="false" />
 
	<!-- fanout模式(发布/订阅模式)queue -->
	<rabbit:queue name="server.to.client.fanout1" durable="true" />
	<rabbit:queue name="server.to.client.fanout2" durable="true" />
 
	<!-- topic模式(模糊匹配)queue -->
	<rabbit:queue name="server.to.client.topic" durable="true" />
 
	<!-- 配置mq监听者 -->
	<rabbit:listener-container
		connection-factory="connectionFactory"
		acknowledge="auto">
		<!-- direct模式queue监听者 -->
		<rabbit:listener queues="server.to.client.direct" ref="serverToClientDirectListener1" />
		<rabbit:listener queues="server.to.client.direct" ref="serverToClientDirectListener2" />
		<!-- fanout模式(发布/订阅模式)queue监听者 -->
		<rabbit:listener queues="server.to.client.fanout1" ref="serverToClientFanoutListener1" />
		<rabbit:listener queues="server.to.client.fanout2" ref="serverToClientFanoutListener2" />
		<!-- topic模式(模糊匹配)queue监听者 -->
		<rabbit:listener queues="server.to.client.topic" ref="serverToClientTopicListener1" />
		<rabbit:listener queues="server.to.client.topic" ref="serverToClientTopicListener2" />
	</rabbit:listener-container>
 
</beans>

ServerToClientDirectListener1.java

ServerToClientDirectListener1.java
package com.gxx.dubbo.client.biz.impl;
 
import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Service;
 
/**
 * direct模式queue监听者
 * @author Gxx
 */
@Service(value = "serverToClientDirectListener1")
public class ServerToClientDirectListener1 implements MessageListener {
 
	/**
	 * 日志处理器
	 */
	private final Logger logger = Logger.getLogger(ServerToClientDirectListener1.class);
 
	@Override
	public void onMessage(Message message) {
		logger.info("direct模式queue监听者[1]收到MQ内容:" + message);
	}
 
}

ServerToClientDirectListener2.java

ServerToClientDirectListener2.java
package com.gxx.dubbo.client.biz.impl;
 
import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Service;
 
/**
 * direct模式queue监听者
 * @author Gxx
 */
@Service(value = "serverToClientDirectListener2")
public class ServerToClientDirectListener2 implements MessageListener {
 
	/**
	 * 日志处理器
	 */
	private final Logger logger = Logger.getLogger(ServerToClientDirectListener2.class);
 
	@Override
	public void onMessage(Message message) {
		logger.info("direct模式queue监听者[2]收到MQ内容:" + message);
	}
 
}

ServerToClientFanoutListener1.java

ServerToClientFanoutListener1.java
package com.gxx.dubbo.client.biz.impl;
 
import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Service;
 
/**
 * fanout模式(发布/订阅模式)queue监听者
 * @author Gxx
 */
@Service(value = "serverToClientFanoutListener1")
public class ServerToClientFanoutListener1 implements MessageListener {
 
	/**
	 * 日志处理器
	 */
	private final Logger logger = Logger.getLogger(ServerToClientFanoutListener1.class);
 
	@Override
	public void onMessage(Message message) {
		logger.info("fanout模式(发布/订阅模式)queue监听者[1]收到MQ内容:" + message);
	}
 
}

ServerToClientFanoutListener2.java

ServerToClientFanoutListener2.java
package com.gxx.dubbo.client.biz.impl;
 
import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Service;
 
/**
 * fanout模式(发布/订阅模式)queue监听者
 * @author Gxx
 */
@Service(value = "serverToClientFanoutListener2")
public class ServerToClientFanoutListener2 implements MessageListener {
 
	/**
	 * 日志处理器
	 */
	private final Logger logger = Logger.getLogger(ServerToClientFanoutListener2.class);
 
	@Override
	public void onMessage(Message message) {
		logger.info("fanout模式(发布/订阅模式)queue监听者[2]收到MQ内容:" + message);
	}
 
}

ServerToClientTopicListener1.java

ServerToClientTopicListener1.java
package com.gxx.dubbo.client.biz.impl;
 
import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Service;
 
/**
 * topic模式(模糊匹配)queue监听者
 * @author Gxx
 */
@Service(value = "serverToClientTopicListener1")
public class ServerToClientTopicListener1 implements MessageListener {
 
	/**
	 * 日志处理器
	 */
	private final Logger logger = Logger.getLogger(ServerToClientTopicListener1.class);
 
	@Override
	public void onMessage(Message message) {
		logger.info("topic模式(模糊匹配)queue监听者[1]收到MQ内容:" + message);
	}
 
}

ServerToClientTopicListener2.java

ServerToClientTopicListener2.java
package com.gxx.dubbo.client.biz.impl;
 
import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Service;
 
/**
 * topic模式(模糊匹配)queue监听者
 * @author Gxx
 */
@Service(value = "serverToClientTopicListener2")
public class ServerToClientTopicListener2 implements MessageListener {
 
	/**
	 * 日志处理器
	 */
	private final Logger logger = Logger.getLogger(ServerToClientTopicListener2.class);
 
	@Override
	public void onMessage(Message message) {
		logger.info("topic模式(模糊匹配)queue监听者[2]收到MQ内容:" + message);
	}
 
}

部署测试

部署server-web和client-web到tomcat

测试direct模式

访问

http://localhost:8080/server-web/rabbitmq/sendDirect.htm?name=gxx

观察server.log

2017-06-01 10:48:13,177 INFO  发送direct模式mq开始~
2017-06-01 10:48:13,179 INFO  发送第[1]条mq!
2017-06-01 10:48:13,866 INFO  发送第[2]条mq!
2017-06-01 10:48:13,866 INFO  发送第[3]条mq!
2017-06-01 10:48:13,866 INFO  发送第[4]条mq!
2017-06-01 10:48:13,867 INFO  发送第[5]条mq!
2017-06-01 10:48:13,867 INFO  发送direct模式mq结束~

观察client.log

2017-06-01 10:48:15,593 INFO  direct模式queue监听者[2]收到MQ内容:(Body:'{"index":2,"name":"gxx2"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=direct.exchange, receivedRoutingKey=server.to.client.direct.key, deliveryTag=1, messageCount=0])
2017-06-01 10:48:15,613 INFO  direct模式queue监听者[1]收到MQ内容:(Body:'{"index":1,"name":"gxx1"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=direct.exchange, receivedRoutingKey=server.to.client.direct.key, deliveryTag=1, messageCount=0])
2017-06-01 10:48:15,667 INFO  direct模式queue监听者[2]收到MQ内容:(Body:'{"index":3,"name":"gxx3"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=direct.exchange, receivedRoutingKey=server.to.client.direct.key, deliveryTag=2, messageCount=0])
2017-06-01 10:48:15,699 INFO  direct模式queue监听者[1]收到MQ内容:(Body:'{"index":4,"name":"gxx4"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=direct.exchange, receivedRoutingKey=server.to.client.direct.key, deliveryTag=2, messageCount=0])
2017-06-01 10:48:15,731 INFO  direct模式queue监听者[2]收到MQ内容:(Body:'{"index":5,"name":"gxx5"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=direct.exchange, receivedRoutingKey=server.to.client.direct.key, deliveryTag=3, messageCount=0])

测试fanout模式(发布/订阅模式)

访问

http://localhost:8080/server-web/rabbitmq/sendFanout.htm?name=gxx

观察server.log

2017-06-01 10:49:13,634 INFO  发送fanout模式(发布/订阅模式)mq开始~
2017-06-01 10:49:13,634 INFO  发送第[1]条FanoutMq!
2017-06-01 10:49:13,635 INFO  发送第[2]条FanoutMq!
2017-06-01 10:49:13,636 INFO  发送第[3]条FanoutMq!
2017-06-01 10:49:13,636 INFO  发送第[4]条FanoutMq!
2017-06-01 10:49:13,637 INFO  发送第[5]条FanoutMq!
2017-06-01 10:49:13,637 INFO  发送fanout模式(发布/订阅模式)mq结束~

观察client.log

2017-06-01 10:49:13,671 INFO  fanout模式(发布/订阅模式)queue监听者[1]收到MQ内容:(Body:'{"index":1,"name":"gxx1"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=1, messageCount=0])
2017-06-01 10:49:13,673 INFO  fanout模式(发布/订阅模式)queue监听者[2]收到MQ内容:(Body:'{"index":1,"name":"gxx1"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=1, messageCount=0])
2017-06-01 10:49:13,736 INFO  fanout模式(发布/订阅模式)queue监听者[2]收到MQ内容:(Body:'{"index":2,"name":"gxx2"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=2, messageCount=0])
2017-06-01 10:49:13,738 INFO  fanout模式(发布/订阅模式)queue监听者[1]收到MQ内容:(Body:'{"index":2,"name":"gxx2"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=2, messageCount=0])
2017-06-01 10:49:13,810 INFO  fanout模式(发布/订阅模式)queue监听者[1]收到MQ内容:(Body:'{"index":3,"name":"gxx3"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=3, messageCount=0])
2017-06-01 10:49:13,810 INFO  fanout模式(发布/订阅模式)queue监听者[2]收到MQ内容:(Body:'{"index":3,"name":"gxx3"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=3, messageCount=0])
2017-06-01 10:49:13,872 INFO  fanout模式(发布/订阅模式)queue监听者[2]收到MQ内容:(Body:'{"index":4,"name":"gxx4"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=4, messageCount=0])
2017-06-01 10:49:13,874 INFO  fanout模式(发布/订阅模式)queue监听者[1]收到MQ内容:(Body:'{"index":4,"name":"gxx4"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=4, messageCount=0])
2017-06-01 10:49:13,941 INFO  fanout模式(发布/订阅模式)queue监听者[2]收到MQ内容:(Body:'{"index":5,"name":"gxx5"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=5, messageCount=0])
2017-06-01 10:49:13,943 INFO  fanout模式(发布/订阅模式)queue监听者[1]收到MQ内容:(Body:'{"index":5,"name":"gxx5"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=5, messageCount=0])

测试topic模式(模糊匹配)

访问

http://localhost:8080/server-web/rabbitmq/sendTopic.htm?name=gxx

观察server.log

2017-06-01 10:50:10,863 INFO  发送topic模式(模糊匹配)mq开始~
2017-06-01 10:50:10,863 INFO  发送第[1]条TopicMq!
2017-06-01 10:50:10,864 INFO  发送第[2]条TopicMq!
2017-06-01 10:50:10,864 INFO  发送第[3]条TopicMq!
2017-06-01 10:50:10,864 INFO  发送第[4]条TopicMq!
2017-06-01 10:50:10,865 INFO  发送第[5]条TopicMq!
2017-06-01 10:50:10,865 INFO  发送topic模式(模糊匹配)mq结束~

观察client.log

2017-06-01 10:50:10,903 INFO  topic模式(模糊匹配)queue监听者[2]收到MQ内容:(Body:'{"index":2,"name":"gxx2"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=topic.exchange, receivedRoutingKey=a.b.c.topic, deliveryTag=1, messageCount=0])
2017-06-01 10:50:10,904 INFO  topic模式(模糊匹配)queue监听者[1]收到MQ内容:(Body:'{"index":1,"name":"gxx1"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=topic.exchange, receivedRoutingKey=a.b.c.topic, deliveryTag=1, messageCount=0])
2017-06-01 10:50:10,995 INFO  topic模式(模糊匹配)queue监听者[1]收到MQ内容:(Body:'{"index":3,"name":"gxx3"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=topic.exchange, receivedRoutingKey=a.b.c.topic, deliveryTag=2, messageCount=0])
2017-06-01 10:50:10,995 INFO  topic模式(模糊匹配)queue监听者[2]收到MQ内容:(Body:'{"index":4,"name":"gxx4"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=topic.exchange, receivedRoutingKey=a.b.c.topic, deliveryTag=2, messageCount=0])
2017-06-01 10:50:11,277 INFO  topic模式(模糊匹配)queue监听者[2]收到MQ内容:(Body:'{"index":5,"name":"gxx5"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=topic.exchange, receivedRoutingKey=a.b.c.topic, deliveryTag=3, messageCount=0])

rabbitmq控台观察

附源码

server.zip

client.zip

几个属性的解释

About queue:

1 queue-name

The queue name MAY be empty, in which case the server MUST create a new queue with a unique generated name and return this to the client in the Declare-Ok method.

2 passive

If set, the server will reply with Declare-Ok if the queue already exists with the same name, and raise an error if not.

3 durable

If set when creating a new queue, the queue will be marked as durable. Durable queues remain active when a server restarts. Non-durable queues (transient queues) are purged if/when a server restarts. Note that durable queues do not necessarily hold persistent messages, although it does not make sense to send persistent messages to a transient queue.

The server MUST recreate the durable queue after a restart.

The server MUST support both durable and transient queues.

4 exclusive

Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Passive declaration of an exclusive queue by other connections are not allowed.

The server MUST support both exclusive (private) and non-exclusive (shared) queues.

The client MAY NOT attempt to use a queue that was declared as exclusive by another still-open connection. Error code: resource-locked

5 auto-delete

If set, the queue is deleted when all consumers have finished using it. The last consumer can be cancelled either explicitly or because its channel is closed. If there was no consumer ever on the queue, it won't be deleted. Applications can explicitly delete auto-delete queues using the Delete method as normal.

The server MUST ignore the auto-delete field if the queue already exists.

About exchange:

1 passive

If set, the server will reply with Declare-Ok if the exchange already exists with the same name, and raise an error if not.

2 durable

If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged if/when a server restarts.

3 auto-delete

If set, the exchange is deleted when all queues have finished using it.

The server SHOULD allow for a reasonable delay between the point when it determines that an exchange is not being used (or no longer used), and the point when it deletes the exchange. At the least it must allow a client to create an exchange and then bind a queue to it, with a small but non-zero delay between these two actions.

The server MUST ignore the auto-delete field if the exchange already exists.

4 internal

If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications.