====== spring整合activemq ====== ===== 加入pom依赖 ===== commons-pool commons-pool 1.6 org.springframework spring-jms 4.0.5.RELEASE org.apache.activemq activemq-core 5.7.0 org.apache.activemq activemq-pool 5.7.0 ===== application-context.xml ===== ===== application-jms-activemq.xml ===== RECORD_QUEUE ===== activemq.properties ===== # activemq settings activemq.server.url=tcp://121.43.104.34:61616 ===== JmsService.java ===== package com.gxx.record.service; import javax.jms.Destination; /** *
*
Title:
*
* JMS服务接口 *
*
Description:
*
*

none *

*
* * @author Administrator * @version 1.0, 2015年9月24日 * @since record * */ public interface JmsService { /** * 想默认目的地发送消息 * @param message */ public void sendMessage(final String message); /** * 指定目的地发送消息 * @param destination * @param message */ public void sendMessage(final Destination destination, final String message); }
===== JmsServiceImpl.java ===== package com.gxx.record.service.impl; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Service; import com.gxx.record.service.JmsService; /** *
*
Title:
*
* 用户服务实现类 *
*
Description:
*
*

none *

*
* * @author Administrator * @version 1.0, 2015年6月18日 * @since record * */ @Service("jmsService") public class JmsServiceImpl implements JmsService { /** * 日志处理器 */ Logger logger = Logger.getLogger(JmsServiceImpl.class); @Autowired private JmsTemplate jmsTemplate; /** * 点对点(Point-to-Point)目的地 */ @Autowired Destination queueDestination; /** * 发布/订阅(Publish/Subscribe)目的地 */ @Autowired Destination topicDestination; @Override /** * 想默认目的地发送消息 * @param message */ public void sendMessage(final String message) { logger.info("生产者向默认目的地发了一个消息:" + message); jmsTemplate.send(queueDestination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); } @Override /** * 指定目的地发送消息 * @param destination * @param message */ public void sendMessage(final Destination destination, final String message){ logger.info("生产者向指定目的地发了一个消息:" + message); jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); } }
===== ConsumerMessageListener.java ===== package com.gxx.record.jms; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import org.apache.log4j.Logger; /** * *
*
Title:
*
* 消息监听器 *
*
Description:
*
*

none *

*
* * @author Administrator * @version 1.0, 2015年9月24日 * @since record * */ public class ConsumerMessageListener implements MessageListener { /** * 日志处理器 */ Logger logger = Logger.getLogger(ConsumerMessageListener.class); @Override public void onMessage(Message message) { //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换 TextMessage textMsg = (TextMessage) message; try { logger.info("消息监听器接收内容:" + textMsg.getText()); } catch (JMSException e) { logger.error("消息监听器接收内容异常发生!", e); } } }
===== JmsController.java ===== package com.gxx.record.web.jms; import java.util.HashMap; import java.util.Map; import javax.servlet.http.HttpServletRequest; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import com.gxx.record.service.JmsService; /** * UserController * * @author gxx */ @Controller @RequestMapping("/jms/") public class JmsController { /** * 日志处理器 */ private final Logger logger = Logger.getLogger(JmsController.class); @Autowired private JmsService jmsService; /** * 发送消息 * @param request * @return */ @RequestMapping(value = "/send",produces="application/json") public @ResponseBody Map registJsp(HttpServletRequest request) { String content = request.getParameter("content"); logger.info("发送消息:内容=[" + content + "]"); jmsService.sendMessage(content); Map resultMap = new HashMap(); resultMap.put("success", true); return resultMap; } } ===== 发送消息 ===== 请求地址Controller:http://ip:port/record/jms/send.htm?content=hello baby~~~,日志打出如下: 2015-09-24 16:25:08,920 [http-80-exec-7] INFO [com.gxx.record.web.jms.JmsController:40] - 发送消息:内容=[hello baby~~~] 2015-09-24 16:25:08,921 [http-80-exec-7] INFO [com.gxx.record.service.impl.JmsServiceImpl:49] - 生产者发了一个消息:hello baby~~~ 2015-09-24 16:25:09,100 [jmsContainer-1] INFO [com.gxx.record.jms.ConsumerMessageListener:40] - 消息监听器接收内容:hello baby~~~ ===== 配置多个监听器 ===== 如果监听目的地为点对点(Point-to-Point),则只会有一个监听器监听到,日志如下: 2015-09-24 16:25:08,920 [http-80-exec-7] INFO [com.gxx.record.web.jms.JmsController:40] - 发送消息:内容=[hello baby~~~] 2015-09-24 16:25:08,921 [http-80-exec-7] INFO [com.gxx.record.service.impl.JmsServiceImpl:49] - 生产者发了一个消息:hello baby~~~ 2015-09-24 16:25:09,100 [jmsContainer-1] INFO [com.gxx.record.jms.ConsumerMessageListener:40] - 消息监听器接收内容:hello baby~~~ 如果监听目的地为发布/订阅(Publish/Subscribe),则多个监听器都会监听到,日志如下: 2015-09-24 16:32:17,791 [http-80-exec-3] INFO [com.gxx.record.web.jms.JmsController:40] - 发送消息:内容=[hello baby~~~] 2015-09-24 16:32:17,791 [http-80-exec-3] INFO [com.gxx.record.service.impl.JmsServiceImpl:52] - 生产者向默认目的地发了一个消息:hello baby~~~ 2015-09-24 16:32:17,922 [jmsContainer_2-1] INFO [com.gxx.record.jms.ConsumerMessageListener:40] - 消息监听器接收内容:hello baby~~~ 2015-09-24 16:32:17,922 [jmsContainer-1] INFO [com.gxx.record.jms.ConsumerMessageListener:40] - 消息监听器接收内容:hello baby~~~ ===== 顺序读取消息 ===== 修改代码,使得处理一条消息需要睡眠3秒钟,观察日志,显示每条消息依次被处理,不会一时间涌入,造成系统压力。 package com.gxx.record.jms; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import org.apache.log4j.Logger; /** * *
*
Title:
*
* 消息监听器 *
*
Description:
*
*

none *

*
* * @author Administrator * @version 1.0, 2015年9月24日 * @since record * */ public class ConsumerMessageListener implements MessageListener { /** * 日志处理器 */ Logger logger = Logger.getLogger(ConsumerMessageListener.class); @Override public void onMessage(Message message) { //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换 TextMessage textMsg = (TextMessage) message; try { logger.info("消息监听器接收内容开始===:" + textMsg.getText()); Thread.currentThread(); Thread.sleep(3000); logger.info("消息监听器接收内容结束===:" + textMsg.getText()); } catch (JMSException e) { logger.error("消息监听器接收内容异常发生!", e); } catch (Exception e) { logger.error("睡眠3秒,异常发生!"); } } }
2015-09-25 12:56:19,675 [jmsContainer-1] INFO [com.gxx.record.jms.ConsumerMessageListener:40] - 消息监听器接收内容开始===:发送消息~~~ 2015-09-25 12:56:22,676 [jmsContainer-1] INFO [com.gxx.record.jms.ConsumerMessageListener:43] - 消息监听器接收内容结束===:发送消息~~~ 2015-09-25 12:56:22,676 [jmsContainer-1] INFO [com.gxx.record.jms.ConsumerMessageListener:40] - 消息监听器接收内容开始===:发送消息~~~ 2015-09-25 12:56:25,676 [jmsContainer-1] INFO [com.gxx.record.jms.ConsumerMessageListener:43] - 消息监听器接收内容结束===:发送消息~~~ 2015-09-25 12:56:25,676 [jmsContainer-1] INFO [com.gxx.record.jms.ConsumerMessageListener:40] - 消息监听器接收内容开始===:发送消息~~~ 2015-09-25 12:56:28,677 [jmsContainer-1] INFO [com.gxx.record.jms.ConsumerMessageListener:43] - 消息监听器接收内容结束===:发送消息~~~