<!-- apache common --> <dependency> <groupId>commons-pool</groupId> <artifactId>commons-pool</artifactId> <version>1.6</version> </dependency> <!-- jms --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.0.5.RELEASE</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.7.0</version> </dependency>
<!-- 数据库配置文件位置 --> <context:property-placeholder location="classpath:/jdbc.properties,classpath:/redis.properties,classpath:/memcached.properties,classpath:/mongodb.properties,classpath:/activemq.properties" />
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:p="http://www.springframework.org/schema/p" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:security="http://www.springframework.org/schema/security" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd"> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${activemq.server.url}"/> </bean> <!-- 连接池 --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="targetConnectionFactory"/> <property name="maxConnections" value="10"/> </bean> <!-- 连接工厂 --> <bean id="jmsConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="pooledConnectionFactory"/> </bean> <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <property name="connectionFactory" ref="jmsConnectionFactory"/> </bean> <!--这个是队列目的地,点对点(Point-to-Point)--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>RECORD_QUEUE</value> </constructor-arg> </bean> <!--这个是主题目的地,发布/订阅(Publish/Subscribe)--> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="RECORD_TOPIC"/> </bean> <!-- 消息监听器 --> <!-- 如果多个监听器监听点对点(Point-to-Point)目的地,根据日志可以看出,只有一个监听器会监听到消息 --> <!-- 如果多个监听器监听发布/订阅(Publish/Subscribe)目的地,根据日志可以看出,多个监听器都会监听到消息 --> <!-- 当然一个应用针对一个目的地一般只会配置一个监听器 --> <bean id="consumerMessageListener" class="com.gxx.record.jms.ConsumerMessageListener"/> <!-- 消息监听容器 --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="jmsConnectionFactory" /> <property name="destination" ref="queueDestination" /><!-- 指定目的地 --> <property name="messageListener" ref="consumerMessageListener" /> </bean> <!-- 配置多个监听器 --> <!-- 消息监听器 --> <!-- <bean id="consumerMessageListener_2" class="com.gxx.record.jms.ConsumerMessageListener"/> --> <!-- 消息监听容器 --> <!-- <bean id="jmsContainer_2" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="jmsConnectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="consumerMessageListener_2" /> </bean> --> </beans>
# activemq settings activemq.server.url=tcp://121.43.104.34:61616
package com.gxx.record.service; import javax.jms.Destination; /** * <dl> * <dt><b>Title:</b></dt> * <dd> * JMS服务接口 * </dd> * <dt><b>Description:</b></dt> * <dd> * <p>none * </dd> * </dl> * * @author Administrator * @version 1.0, 2015年9月24日 * @since record * */ public interface JmsService { /** * 想默认目的地发送消息 * @param message */ public void sendMessage(final String message); /** * 指定目的地发送消息 * @param destination * @param message */ public void sendMessage(final Destination destination, final String message); }
package com.gxx.record.service.impl; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Service; import com.gxx.record.service.JmsService; /** * <dl> * <dt><b>Title:</b></dt> * <dd> * 用户服务实现类 * </dd> * <dt><b>Description:</b></dt> * <dd> * <p>none * </dd> * </dl> * * @author Administrator * @version 1.0, 2015年6月18日 * @since record * */ @Service("jmsService") public class JmsServiceImpl implements JmsService { /** * 日志处理器 */ Logger logger = Logger.getLogger(JmsServiceImpl.class); @Autowired private JmsTemplate jmsTemplate; /** * 点对点(Point-to-Point)目的地 */ @Autowired Destination queueDestination; /** * 发布/订阅(Publish/Subscribe)目的地 */ @Autowired Destination topicDestination; @Override /** * 想默认目的地发送消息 * @param message */ public void sendMessage(final String message) { logger.info("生产者向默认目的地发了一个消息:" + message); jmsTemplate.send(queueDestination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); } @Override /** * 指定目的地发送消息 * @param destination * @param message */ public void sendMessage(final Destination destination, final String message){ logger.info("生产者向指定目的地发了一个消息:" + message); jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); } }
package com.gxx.record.jms; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import org.apache.log4j.Logger; /** * * <dl> * <dt><b>Title:</b></dt> * <dd> * 消息监听器 * </dd> * <dt><b>Description:</b></dt> * <dd> * <p>none * </dd> * </dl> * * @author Administrator * @version 1.0, 2015年9月24日 * @since record * */ public class ConsumerMessageListener implements MessageListener { /** * 日志处理器 */ Logger logger = Logger.getLogger(ConsumerMessageListener.class); @Override public void onMessage(Message message) { //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换 TextMessage textMsg = (TextMessage) message; try { logger.info("消息监听器接收内容:" + textMsg.getText()); } catch (JMSException e) { logger.error("消息监听器接收内容异常发生!", e); } } }
package com.gxx.record.web.jms; import java.util.HashMap; import java.util.Map; import javax.servlet.http.HttpServletRequest; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import com.gxx.record.service.JmsService; /** * UserController * * @author gxx */ @Controller @RequestMapping("/jms/") public class JmsController { /** * 日志处理器 */ private final Logger logger = Logger.getLogger(JmsController.class); @Autowired private JmsService jmsService; /** * 发送消息 * @param request * @return */ @RequestMapping(value = "/send",produces="application/json") public @ResponseBody Map<String, Object> registJsp(HttpServletRequest request) { String content = request.getParameter("content"); logger.info("发送消息:内容=[" + content + "]"); jmsService.sendMessage(content); Map<String, Object> resultMap = new HashMap<String, Object>(); resultMap.put("success", true); return resultMap; } }
请求地址Controller:http://ip:port/record/jms/send.htm?content=hello baby~~~,日志打出如下:
2015-09-24 16:25:08,920 [http-80-exec-7] INFO [com.gxx.record.web.jms.JmsController:40] - 发送消息:内容=[hello baby~~~] 2015-09-24 16:25:08,921 [http-80-exec-7] INFO [com.gxx.record.service.impl.JmsServiceImpl:49] - 生产者发了一个消息:hello baby~~~ 2015-09-24 16:25:09,100 [jmsContainer-1] INFO [com.gxx.record.jms.ConsumerMessageListener:40] - 消息监听器接收内容:hello baby~~~
如果监听目的地为点对点(Point-to-Point),则只会有一个监听器监听到,日志如下:
2015-09-24 16:25:08,920 [http-80-exec-7] INFO [com.gxx.record.web.jms.JmsController:40] - 发送消息:内容=[hello baby~~~] 2015-09-24 16:25:08,921 [http-80-exec-7] INFO [com.gxx.record.service.impl.JmsServiceImpl:49] - 生产者发了一个消息:hello baby~~~ 2015-09-24 16:25:09,100 [jmsContainer-1] INFO [com.gxx.record.jms.ConsumerMessageListener:40] - 消息监听器接收内容:hello baby~~~
如果监听目的地为发布/订阅(Publish/Subscribe),则多个监听器都会监听到,日志如下:
2015-09-24 16:32:17,791 [http-80-exec-3] INFO [com.gxx.record.web.jms.JmsController:40] - 发送消息:内容=[hello baby~~~] 2015-09-24 16:32:17,791 [http-80-exec-3] INFO [com.gxx.record.service.impl.JmsServiceImpl:52] - 生产者向默认目的地发了一个消息:hello baby~~~ 2015-09-24 16:32:17,922 [jmsContainer_2-1] INFO [com.gxx.record.jms.ConsumerMessageListener:40] - 消息监听器接收内容:hello baby~~~ 2015-09-24 16:32:17,922 [jmsContainer-1] INFO [com.gxx.record.jms.ConsumerMessageListener:40] - 消息监听器接收内容:hello baby~~~
修改代码,使得处理一条消息需要睡眠3秒钟,观察日志,显示每条消息依次被处理,不会一时间涌入,造成系统压力。
package com.gxx.record.jms; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import org.apache.log4j.Logger; /** * * <dl> * <dt><b>Title:</b></dt> * <dd> * 消息监听器 * </dd> * <dt><b>Description:</b></dt> * <dd> * <p>none * </dd> * </dl> * * @author Administrator * @version 1.0, 2015年9月24日 * @since record * */ public class ConsumerMessageListener implements MessageListener { /** * 日志处理器 */ Logger logger = Logger.getLogger(ConsumerMessageListener.class); @Override public void onMessage(Message message) { //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换 TextMessage textMsg = (TextMessage) message; try { logger.info("消息监听器接收内容开始===:" + textMsg.getText()); Thread.currentThread(); Thread.sleep(3000); logger.info("消息监听器接收内容结束===:" + textMsg.getText()); } catch (JMSException e) { logger.error("消息监听器接收内容异常发生!", e); } catch (Exception e) { logger.error("睡眠3秒,异常发生!"); } } }
2015-09-25 12:56:19,675 [jmsContainer-1] INFO [com.gxx.record.jms.ConsumerMessageListener:40] - 消息监听器接收内容开始===:发送消息~~~ 2015-09-25 12:56:22,676 [jmsContainer-1] INFO [com.gxx.record.jms.ConsumerMessageListener:43] - 消息监听器接收内容结束===:发送消息~~~ 2015-09-25 12:56:22,676 [jmsContainer-1] INFO [com.gxx.record.jms.ConsumerMessageListener:40] - 消息监听器接收内容开始===:发送消息~~~ 2015-09-25 12:56:25,676 [jmsContainer-1] INFO [com.gxx.record.jms.ConsumerMessageListener:43] - 消息监听器接收内容结束===:发送消息~~~ 2015-09-25 12:56:25,676 [jmsContainer-1] INFO [com.gxx.record.jms.ConsumerMessageListener:40] - 消息监听器接收内容开始===:发送消息~~~ 2015-09-25 12:56:28,677 [jmsContainer-1] INFO [com.gxx.record.jms.ConsumerMessageListener:43] - 消息监听器接收内容结束===:发送消息~~~