用户工具

站点工具


分享:技术:jms:spring整合rabbitmq

这是本文档旧的修订版!


spring整合rabbitmq

官网

三种模式

  • direct模式

  • fanout模式(发布/订阅模式)

  • topic模式(模糊匹配)

server端

pom.xml

pom.xml
<dependency>
	<groupId>org.springframework.amqp</groupId>
	<artifactId>spring-rabbit</artifactId>
	<version>1.3.5.RELEASE</version>
</dependency>

application-mq-rabbit.xml

application-mq-rabbit.xml
<?xml version="1.0" encoding="UTF-8"?>
 
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:task="http://www.springframework.org/schema/task"
	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
	  http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
	  http://www.springframework.org/schema/context
	  http://www.springframework.org/schema/context/spring-context.xsd
	  http://www.springframework.org/schema/integration/ip
	  http://www.springframework.org/schema/integration/ip/spring-integration-ip-2.0.xsd
	  http://www.springframework.org/schema/integration
	  http://www.springframework.org/schema/integration/spring-integration.xsd
	  http://www.springframework.org/schema/task
	  http://www.springframework.org/schema/task/spring-task-3.0.xsd
	  http://www.springframework.org/schema/beans
	  http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
	  http://www.springframework.org/schema/rabbit
	  http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
 
	<!-- mq连接服务配置 -->
	<rabbit:connection-factory id="connectionFactory"
				host="121.40.171.96"
				username="admin"
				password="admin123"
				port="5672" />
 
	<rabbit:admin connection-factory="connectionFactory" />
 
	<!-- direct模式开始 -->
	<!-- 定义template -->
	<rabbit:template id="directTemplate" 
		connection-factory="connectionFactory"
		exchange="direct.exchange" />
 
	<!-- 定义queue -->
	<rabbit:queue name="server.to.client.direct"
		durable="true" 
		auto-delete="false" 
		exclusive="false" />
 
	<!-- 定义exchange -->
	<rabbit:direct-exchange name="direct.exchange"
		durable="true" 
		auto-delete="false">
 
		<rabbit:bindings>
			<!-- 定义binding -->
			<rabbit:binding queue="server.to.client.direct"
				key="server.to.client.direct.key" />
		</rabbit:bindings>
 
	</rabbit:direct-exchange>
	<!-- direct模式结束 -->
 
	<!-- fanout模式(发布/订阅模式)开始 -->
	<!-- 定义template -->
	<rabbit:template id="fanoutTemplate" 
		connection-factory="connectionFactory"
		exchange="fanout.exchange" />
 
	<!-- 定义queue -->
	<rabbit:queue name="server.to.client.fanout1" durable="true" />
	<rabbit:queue name="server.to.client.fanout2" durable="true" />
 
	<!-- 定义exchange -->
	<rabbit:fanout-exchange name="fanout.exchange" xmlns="http://www.springframework.org/schema/rabbit" durable="true">
		<rabbit:bindings>
			<!-- 定义binding -->
			<rabbit:binding queue="server.to.client.fanout1"></rabbit:binding> 
			<rabbit:binding queue="server.to.client.fanout2"></rabbit:binding> 
		</rabbit:bindings> 
	</rabbit:fanout-exchange>
	<!-- fanout模式(发布/订阅模式)结束 -->
 
	<!-- topic模式(模糊匹配)开始 -->
	<!-- 定义template -->
	<rabbit:template id="topicTemplate" 
		connection-factory="connectionFactory"
		exchange="topic.exchange" />
 
	<!-- 定义queue -->
	<rabbit:queue name="server.to.client.topic" durable="true" />
 
	<!-- 定义exchange -->
	<rabbit:topic-exchange name="topic.exchange" xmlns="http://www.springframework.org/schema/rabbit" durable="true">
		<bindings>
			<!-- 定义binding -->
			<binding queue="server.to.client.topic" pattern="*.*.*.topic" />
		</bindings>
	</rabbit:topic-exchange>
	<!-- topic模式(模糊匹配)结束 -->
 
</beans>

RabbitmqController.java

RabbitmqController.java
package com.gxx.dubbo.server.web;
 
import java.util.HashMap;
import java.util.Map;
 
import javax.servlet.http.HttpServletRequest;
 
import org.apache.log4j.Logger;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
 
import com.alibaba.fastjson.JSON;
 
/**
 * rabbitmq控制类
 * @author Gxx
 */
@Controller
@RequestMapping("/rabbitmq/")
public class RabbitmqController {
 
	/**
	 * 日志处理器
	 */
	private final Logger logger = Logger.getLogger(RabbitmqController.class);
 
	/**
	 * direct mq执行器
	 */
	@Autowired
	private AmqpTemplate directTemplate;
 
	/**
	 * fanout mq执行器
	 */
	@Autowired
	private AmqpTemplate fanoutTemplate;
 
	/**
	 * topic mq执行器
	 */
	@Autowired
	private AmqpTemplate topicTemplate;
 
	/**
	 * 发送direct模式mq
	 * @return
	 */
	@RequestMapping(value = "/sendDirect")
	public @ResponseBody String sendDirect(HttpServletRequest request) {
		logger.info("发送direct模式mq开始~");
		for(int i=0;i<5;i++) {
			logger.info("发送第[" + (i+1) + "]条mq!");
			/**
			 * 组织map
			 */
			Map<String, Object> map = new HashMap<>();
			map.put("index", i+1);
			map.put("name", request.getParameter("name") + (i+1));
			/**
			 * 转json串
			 */
			String jsonString  = JSON.toJSONString(map);
			/**
			 * 发送mq
			 */
			directTemplate.convertAndSend("server.to.client.direct.key", jsonString);
		}
		logger.info("发送direct模式mq结束~");
		return "发送direct模式mq结束~";
	}
 
	/**
	 * 发送fanout模式(发布/订阅模式)mq
	 * @return
	 */
	@RequestMapping(value = "/sendFanout")
	public @ResponseBody String sendFanout(HttpServletRequest request) {
		logger.info("发送fanout模式(发布/订阅模式)mq开始~");
		for(int i=0;i<5;i++) {
			logger.info("发送第[" + (i+1) + "]条FanoutMq!");
			/**
			 * 组织map
			 */
			Map<String, Object> map = new HashMap<>();
			map.put("index", i+1);
			map.put("name", request.getParameter("name") + (i+1));
			/**
			 * 转json串
			 */
			String jsonString  = JSON.toJSONString(map);
			/**
			 * 发送mq
			 */
			fanoutTemplate.convertAndSend(jsonString);
		}
		logger.info("发送fanout模式(发布/订阅模式)mq结束~");
		return "发送fanout模式(发布/订阅模式)mq结束~";
	}
 
	/**
	 * 发送topic模式(模糊匹配)mq
	 * @return
	 */
	@RequestMapping(value = "/sendTopic")
	public @ResponseBody String sendTopic(HttpServletRequest request) {
		logger.info("发送topic模式(模糊匹配)mq开始~");
		for(int i=0;i<5;i++) {
			logger.info("发送第[" + (i+1) + "]条TopicMq!");
			/**
			 * 组织map
			 */
			Map<String, Object> map = new HashMap<>();
			map.put("index", i+1);
			map.put("name", request.getParameter("name") + (i+1));
			/**
			 * 转json串
			 */
			String jsonString  = JSON.toJSONString(map);
			/**
			 * 发送mq
			 */
			topicTemplate.convertAndSend("a.b.c.topic", jsonString);
		}
		logger.info("发送topic模式(模糊匹配)mq结束~");
		return "发送topic模式(模糊匹配)mq结束~";
	}
}

client端

pom.xml

pom.xml
<dependency>
	<groupId>org.springframework.amqp</groupId>
	<artifactId>spring-rabbit</artifactId>
	<version>1.3.5.RELEASE</version>
</dependency>

application-mq-rabbit.xml

application-mq-rabbit.xml
<?xml version="1.0" encoding="UTF-8"?>
 
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:task="http://www.springframework.org/schema/task"
	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
	  http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
	  http://www.springframework.org/schema/context
	  http://www.springframework.org/schema/context/spring-context.xsd
	  http://www.springframework.org/schema/integration/ip
	  http://www.springframework.org/schema/integration/ip/spring-integration-ip-2.0.xsd
	  http://www.springframework.org/schema/integration
	  http://www.springframework.org/schema/integration/spring-integration.xsd
	  http://www.springframework.org/schema/task
	  http://www.springframework.org/schema/task/spring-task-3.0.xsd
	  http://www.springframework.org/schema/beans
	  http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
	  http://www.springframework.org/schema/rabbit
	  http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
 
	<!-- mq连接服务配置 -->
	<rabbit:connection-factory id="connectionFactory"
				host="121.40.171.96"
				username="admin"
				password="admin123"
				port="5672" />
 
	<rabbit:admin connection-factory="connectionFactory" />
 
	<!-- direct模式queue -->
	<rabbit:queue name="server.to.client.direct" durable="true" 
		auto-delete="false" exclusive="false" />
 
	<!-- fanout模式(发布/订阅模式)queue -->
	<rabbit:queue name="server.to.client.fanout1" durable="true" />
	<rabbit:queue name="server.to.client.fanout2" durable="true" />
 
	<!-- topic模式(模糊匹配)queue -->
	<rabbit:queue name="server.to.client.topic" durable="true" />
 
	<!-- 配置mq监听者 -->
	<rabbit:listener-container
		connection-factory="connectionFactory"
		acknowledge="auto">
		<!-- direct模式queue监听者 -->
		<rabbit:listener queues="server.to.client.direct" ref="serverToClientDirectListener1" />
		<rabbit:listener queues="server.to.client.direct" ref="serverToClientDirectListener2" />
		<!-- fanout模式(发布/订阅模式)queue监听者 -->
		<rabbit:listener queues="server.to.client.fanout1" ref="serverToClientFanoutListener1" />
		<rabbit:listener queues="server.to.client.fanout2" ref="serverToClientFanoutListener2" />
		<!-- topic模式(模糊匹配)queue监听者 -->
		<rabbit:listener queues="server.to.client.topic" ref="serverToClientTopicListener1" />
		<rabbit:listener queues="server.to.client.topic" ref="serverToClientTopicListener2" />
	</rabbit:listener-container>
 
</beans>

ServerToClientDirectListener1.java

ServerToClientDirectListener1.java
package com.gxx.dubbo.client.biz.impl;
 
import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Service;
 
/**
 * direct模式queue监听者
 * @author Gxx
 */
@Service(value = "serverToClientDirectListener1")
public class ServerToClientDirectListener1 implements MessageListener {
 
	/**
	 * 日志处理器
	 */
	private final Logger logger = Logger.getLogger(ServerToClientDirectListener1.class);
 
	@Override
	public void onMessage(Message message) {
		logger.info("direct模式queue监听者[1]收到MQ内容:" + message);
	}
 
}

ServerToClientDirectListener2.java

ServerToClientDirectListener2.java
package com.gxx.dubbo.client.biz.impl;
 
import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Service;
 
/**
 * direct模式queue监听者
 * @author Gxx
 */
@Service(value = "serverToClientDirectListener2")
public class ServerToClientDirectListener2 implements MessageListener {
 
	/**
	 * 日志处理器
	 */
	private final Logger logger = Logger.getLogger(ServerToClientDirectListener2.class);
 
	@Override
	public void onMessage(Message message) {
		logger.info("direct模式queue监听者[2]收到MQ内容:" + message);
	}
 
}

ServerToClientFanoutListener1.java

ServerToClientFanoutListener1.java
package com.gxx.dubbo.client.biz.impl;
 
import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Service;
 
/**
 * fanout模式(发布/订阅模式)queue监听者
 * @author Gxx
 */
@Service(value = "serverToClientFanoutListener1")
public class ServerToClientFanoutListener1 implements MessageListener {
 
	/**
	 * 日志处理器
	 */
	private final Logger logger = Logger.getLogger(ServerToClientFanoutListener1.class);
 
	@Override
	public void onMessage(Message message) {
		logger.info("fanout模式(发布/订阅模式)queue监听者[1]收到MQ内容:" + message);
	}
 
}

ServerToClientFanoutListener2.java

ServerToClientFanoutListener2.java
package com.gxx.dubbo.client.biz.impl;
 
import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Service;
 
/**
 * fanout模式(发布/订阅模式)queue监听者
 * @author Gxx
 */
@Service(value = "serverToClientFanoutListener2")
public class ServerToClientFanoutListener2 implements MessageListener {
 
	/**
	 * 日志处理器
	 */
	private final Logger logger = Logger.getLogger(ServerToClientFanoutListener2.class);
 
	@Override
	public void onMessage(Message message) {
		logger.info("fanout模式(发布/订阅模式)queue监听者[2]收到MQ内容:" + message);
	}
 
}

ServerToClientTopicListener1.java

ServerToClientTopicListener1.java
package com.gxx.dubbo.client.biz.impl;
 
import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Service;
 
/**
 * topic模式(模糊匹配)queue监听者
 * @author Gxx
 */
@Service(value = "serverToClientTopicListener1")
public class ServerToClientTopicListener1 implements MessageListener {
 
	/**
	 * 日志处理器
	 */
	private final Logger logger = Logger.getLogger(ServerToClientTopicListener1.class);
 
	@Override
	public void onMessage(Message message) {
		logger.info("topic模式(模糊匹配)queue监听者[1]收到MQ内容:" + message);
	}
 
}

ServerToClientTopicListener2.java

ServerToClientTopicListener2.java
package com.gxx.dubbo.client.biz.impl;
 
import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Service;
 
/**
 * topic模式(模糊匹配)queue监听者
 * @author Gxx
 */
@Service(value = "serverToClientTopicListener2")
public class ServerToClientTopicListener2 implements MessageListener {
 
	/**
	 * 日志处理器
	 */
	private final Logger logger = Logger.getLogger(ServerToClientTopicListener2.class);
 
	@Override
	public void onMessage(Message message) {
		logger.info("topic模式(模糊匹配)queue监听者[2]收到MQ内容:" + message);
	}
 
}

部署测试

部署server-web和client-web到tomcat

测试direct模式

访问

http://localhost:8080/server-web/rabbitmq/sendDirect.htm?name=gxx

观察server.log

2017-06-01 10:48:13,177 INFO  发送direct模式mq开始~
2017-06-01 10:48:13,179 INFO  发送第[1]条mq!
2017-06-01 10:48:13,866 INFO  发送第[2]条mq!
2017-06-01 10:48:13,866 INFO  发送第[3]条mq!
2017-06-01 10:48:13,866 INFO  发送第[4]条mq!
2017-06-01 10:48:13,867 INFO  发送第[5]条mq!
2017-06-01 10:48:13,867 INFO  发送direct模式mq结束~

观察client.log

2017-06-01 10:48:15,593 INFO  direct模式queue监听者[2]收到MQ内容:(Body:'{"index":2,"name":"gxx2"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=direct.exchange, receivedRoutingKey=server.to.client.direct.key, deliveryTag=1, messageCount=0])
2017-06-01 10:48:15,613 INFO  direct模式queue监听者[1]收到MQ内容:(Body:'{"index":1,"name":"gxx1"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=direct.exchange, receivedRoutingKey=server.to.client.direct.key, deliveryTag=1, messageCount=0])
2017-06-01 10:48:15,667 INFO  direct模式queue监听者[2]收到MQ内容:(Body:'{"index":3,"name":"gxx3"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=direct.exchange, receivedRoutingKey=server.to.client.direct.key, deliveryTag=2, messageCount=0])
2017-06-01 10:48:15,699 INFO  direct模式queue监听者[1]收到MQ内容:(Body:'{"index":4,"name":"gxx4"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=direct.exchange, receivedRoutingKey=server.to.client.direct.key, deliveryTag=2, messageCount=0])
2017-06-01 10:48:15,731 INFO  direct模式queue监听者[2]收到MQ内容:(Body:'{"index":5,"name":"gxx5"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=direct.exchange, receivedRoutingKey=server.to.client.direct.key, deliveryTag=3, messageCount=0])

测试fanout模式(发布/订阅模式)

访问

http://localhost:8080/server-web/rabbitmq/sendFanout.htm?name=gxx

观察server.log

2017-06-01 10:49:13,634 INFO  发送fanout模式(发布/订阅模式)mq开始~
2017-06-01 10:49:13,634 INFO  发送第[1]条FanoutMq!
2017-06-01 10:49:13,635 INFO  发送第[2]条FanoutMq!
2017-06-01 10:49:13,636 INFO  发送第[3]条FanoutMq!
2017-06-01 10:49:13,636 INFO  发送第[4]条FanoutMq!
2017-06-01 10:49:13,637 INFO  发送第[5]条FanoutMq!
2017-06-01 10:49:13,637 INFO  发送fanout模式(发布/订阅模式)mq结束~

观察client.log

2017-06-01 10:49:13,671 INFO  fanout模式(发布/订阅模式)queue监听者[1]收到MQ内容:(Body:'{"index":1,"name":"gxx1"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=1, messageCount=0])
2017-06-01 10:49:13,673 INFO  fanout模式(发布/订阅模式)queue监听者[2]收到MQ内容:(Body:'{"index":1,"name":"gxx1"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=1, messageCount=0])
2017-06-01 10:49:13,736 INFO  fanout模式(发布/订阅模式)queue监听者[2]收到MQ内容:(Body:'{"index":2,"name":"gxx2"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=2, messageCount=0])
2017-06-01 10:49:13,738 INFO  fanout模式(发布/订阅模式)queue监听者[1]收到MQ内容:(Body:'{"index":2,"name":"gxx2"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=2, messageCount=0])
2017-06-01 10:49:13,810 INFO  fanout模式(发布/订阅模式)queue监听者[1]收到MQ内容:(Body:'{"index":3,"name":"gxx3"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=3, messageCount=0])
2017-06-01 10:49:13,810 INFO  fanout模式(发布/订阅模式)queue监听者[2]收到MQ内容:(Body:'{"index":3,"name":"gxx3"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=3, messageCount=0])
2017-06-01 10:49:13,872 INFO  fanout模式(发布/订阅模式)queue监听者[2]收到MQ内容:(Body:'{"index":4,"name":"gxx4"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=4, messageCount=0])
2017-06-01 10:49:13,874 INFO  fanout模式(发布/订阅模式)queue监听者[1]收到MQ内容:(Body:'{"index":4,"name":"gxx4"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=4, messageCount=0])
2017-06-01 10:49:13,941 INFO  fanout模式(发布/订阅模式)queue监听者[2]收到MQ内容:(Body:'{"index":5,"name":"gxx5"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=5, messageCount=0])
2017-06-01 10:49:13,943 INFO  fanout模式(发布/订阅模式)queue监听者[1]收到MQ内容:(Body:'{"index":5,"name":"gxx5"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=fanout.exchange, receivedRoutingKey=, deliveryTag=5, messageCount=0])

测试topic模式(模糊匹配)

访问

http://localhost:8080/server-web/rabbitmq/sendTopic.htm?name=gxx

观察server.log

2017-06-01 10:50:10,863 INFO  发送topic模式(模糊匹配)mq开始~
2017-06-01 10:50:10,863 INFO  发送第[1]条TopicMq!
2017-06-01 10:50:10,864 INFO  发送第[2]条TopicMq!
2017-06-01 10:50:10,864 INFO  发送第[3]条TopicMq!
2017-06-01 10:50:10,864 INFO  发送第[4]条TopicMq!
2017-06-01 10:50:10,865 INFO  发送第[5]条TopicMq!
2017-06-01 10:50:10,865 INFO  发送topic模式(模糊匹配)mq结束~

观察client.log

2017-06-01 10:50:10,903 INFO  topic模式(模糊匹配)queue监听者[2]收到MQ内容:(Body:'{"index":2,"name":"gxx2"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=topic.exchange, receivedRoutingKey=a.b.c.topic, deliveryTag=1, messageCount=0])
2017-06-01 10:50:10,904 INFO  topic模式(模糊匹配)queue监听者[1]收到MQ内容:(Body:'{"index":1,"name":"gxx1"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=topic.exchange, receivedRoutingKey=a.b.c.topic, deliveryTag=1, messageCount=0])
2017-06-01 10:50:10,995 INFO  topic模式(模糊匹配)queue监听者[1]收到MQ内容:(Body:'{"index":3,"name":"gxx3"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=topic.exchange, receivedRoutingKey=a.b.c.topic, deliveryTag=2, messageCount=0])
2017-06-01 10:50:10,995 INFO  topic模式(模糊匹配)queue监听者[2]收到MQ内容:(Body:'{"index":4,"name":"gxx4"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=topic.exchange, receivedRoutingKey=a.b.c.topic, deliveryTag=2, messageCount=0])
2017-06-01 10:50:11,277 INFO  topic模式(模糊匹配)queue监听者[2]收到MQ内容:(Body:'{"index":5,"name":"gxx5"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=topic.exchange, receivedRoutingKey=a.b.c.topic, deliveryTag=3, messageCount=0])

rabbitmq控台观察

附源码

分享/技术/jms/spring整合rabbitmq.1496287775.txt.gz · 最后更改: 2017/06/01 11:29 由 gxx