commons-pool
commons-pool
1.6
org.springframework
spring-jms
4.0.5.RELEASE
org.apache.activemq
activemq-core
5.7.0
org.apache.activemq
activemq-pool
5.7.0
===== application-context.xml =====
===== application-jms-activemq.xml =====
RECORD_QUEUE
===== activemq.properties =====
# activemq settings
activemq.server.url=tcp://121.43.104.34:61616
===== JmsService.java =====
package com.gxx.record.service;
import javax.jms.Destination;
/**
*
* - Title:
* -
* JMS服务接口
*
* - Description:
* -
*
none
*
*
*
* @author Administrator
* @version 1.0, 2015年9月24日
* @since record
*
*/
public interface JmsService {
/**
* 想默认目的地发送消息
* @param message
*/
public void sendMessage(final String message);
/**
* 指定目的地发送消息
* @param destination
* @param message
*/
public void sendMessage(final Destination destination, final String message);
}
===== JmsServiceImpl.java =====
package com.gxx.record.service.impl;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
import com.gxx.record.service.JmsService;
/**
*
* - Title:
* -
* 用户服务实现类
*
* - Description:
* -
*
none
*
*
*
* @author Administrator
* @version 1.0, 2015年6月18日
* @since record
*
*/
@Service("jmsService")
public class JmsServiceImpl implements JmsService {
/**
* 日志处理器
*/
Logger logger = Logger.getLogger(JmsServiceImpl.class);
@Autowired
private JmsTemplate jmsTemplate;
/**
* 点对点(Point-to-Point)目的地
*/
@Autowired
Destination queueDestination;
/**
* 发布/订阅(Publish/Subscribe)目的地
*/
@Autowired
Destination topicDestination;
@Override
/**
* 想默认目的地发送消息
* @param message
*/
public void sendMessage(final String message) {
logger.info("生产者向默认目的地发了一个消息:" + message);
jmsTemplate.send(queueDestination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
}
@Override
/**
* 指定目的地发送消息
* @param destination
* @param message
*/
public void sendMessage(final Destination destination, final String message){
logger.info("生产者向指定目的地发了一个消息:" + message);
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
}
}
===== ConsumerMessageListener.java =====
package com.gxx.record.jms;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.apache.log4j.Logger;
/**
*
*
* - Title:
* -
* 消息监听器
*
* - Description:
* -
*
none
*
*
*
* @author Administrator
* @version 1.0, 2015年9月24日
* @since record
*
*/
public class ConsumerMessageListener implements MessageListener {
/**
* 日志处理器
*/
Logger logger = Logger.getLogger(ConsumerMessageListener.class);
@Override
public void onMessage(Message message) {
//这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换
TextMessage textMsg = (TextMessage) message;
try {
logger.info("消息监听器接收内容:" + textMsg.getText());
} catch (JMSException e) {
logger.error("消息监听器接收内容异常发生!", e);
}
}
}
===== JmsController.java =====
package com.gxx.record.web.jms;
import java.util.HashMap;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import com.gxx.record.service.JmsService;
/**
* UserController
*
* @author gxx
*/
@Controller
@RequestMapping("/jms/")
public class JmsController {
/**
* 日志处理器
*/
private final Logger logger = Logger.getLogger(JmsController.class);
@Autowired
private JmsService jmsService;
/**
* 发送消息
* @param request
* @return
*/
@RequestMapping(value = "/send",produces="application/json")
public @ResponseBody Map registJsp(HttpServletRequest request) {
String content = request.getParameter("content");
logger.info("发送消息:内容=[" + content + "]");
jmsService.sendMessage(content);
Map resultMap = new HashMap();
resultMap.put("success", true);
return resultMap;
}
}
===== 发送消息 =====
请求地址Controller:http://ip:port/record/jms/send.htm?content=hello baby~~~,日志打出如下:
2015-09-24 16:25:08,920 [http-80-exec-7] INFO [com.gxx.record.web.jms.JmsController:40] - 发送消息:内容=[hello baby~~~]
2015-09-24 16:25:08,921 [http-80-exec-7] INFO [com.gxx.record.service.impl.JmsServiceImpl:49] - 生产者发了一个消息:hello baby~~~
2015-09-24 16:25:09,100 [jmsContainer-1] INFO [com.gxx.record.jms.ConsumerMessageListener:40] - 消息监听器接收内容:hello baby~~~
===== 配置多个监听器 =====
如果监听目的地为点对点(Point-to-Point),则只会有一个监听器监听到,日志如下:
2015-09-24 16:25:08,920 [http-80-exec-7] INFO [com.gxx.record.web.jms.JmsController:40] - 发送消息:内容=[hello baby~~~]
2015-09-24 16:25:08,921 [http-80-exec-7] INFO [com.gxx.record.service.impl.JmsServiceImpl:49] - 生产者发了一个消息:hello baby~~~
2015-09-24 16:25:09,100 [jmsContainer-1] INFO [com.gxx.record.jms.ConsumerMessageListener:40] - 消息监听器接收内容:hello baby~~~
如果监听目的地为发布/订阅(Publish/Subscribe),则多个监听器都会监听到,日志如下:
2015-09-24 16:32:17,791 [http-80-exec-3] INFO [com.gxx.record.web.jms.JmsController:40] - 发送消息:内容=[hello baby~~~]
2015-09-24 16:32:17,791 [http-80-exec-3] INFO [com.gxx.record.service.impl.JmsServiceImpl:52] - 生产者向默认目的地发了一个消息:hello baby~~~
2015-09-24 16:32:17,922 [jmsContainer_2-1] INFO [com.gxx.record.jms.ConsumerMessageListener:40] - 消息监听器接收内容:hello baby~~~
2015-09-24 16:32:17,922 [jmsContainer-1] INFO [com.gxx.record.jms.ConsumerMessageListener:40] - 消息监听器接收内容:hello baby~~~
===== 顺序读取消息 =====
修改代码,使得处理一条消息需要睡眠3秒钟,观察日志,显示每条消息依次被处理,不会一时间涌入,造成系统压力。
package com.gxx.record.jms;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.apache.log4j.Logger;
/**
*
*
* - Title:
* -
* 消息监听器
*
* - Description:
* -
*
none
*
*
*
* @author Administrator
* @version 1.0, 2015年9月24日
* @since record
*
*/
public class ConsumerMessageListener implements MessageListener {
/**
* 日志处理器
*/
Logger logger = Logger.getLogger(ConsumerMessageListener.class);
@Override
public void onMessage(Message message) {
//这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换
TextMessage textMsg = (TextMessage) message;
try {
logger.info("消息监听器接收内容开始===:" + textMsg.getText());
Thread.currentThread();
Thread.sleep(3000);
logger.info("消息监听器接收内容结束===:" + textMsg.getText());
} catch (JMSException e) {
logger.error("消息监听器接收内容异常发生!", e);
} catch (Exception e) {
logger.error("睡眠3秒,异常发生!");
}
}
}
2015-09-25 12:56:19,675 [jmsContainer-1] INFO [com.gxx.record.jms.ConsumerMessageListener:40] - 消息监听器接收内容开始===:发送消息~~~
2015-09-25 12:56:22,676 [jmsContainer-1] INFO [com.gxx.record.jms.ConsumerMessageListener:43] - 消息监听器接收内容结束===:发送消息~~~
2015-09-25 12:56:22,676 [jmsContainer-1] INFO [com.gxx.record.jms.ConsumerMessageListener:40] - 消息监听器接收内容开始===:发送消息~~~
2015-09-25 12:56:25,676 [jmsContainer-1] INFO [com.gxx.record.jms.ConsumerMessageListener:43] - 消息监听器接收内容结束===:发送消息~~~
2015-09-25 12:56:25,676 [jmsContainer-1] INFO [com.gxx.record.jms.ConsumerMessageListener:40] - 消息监听器接收内容开始===:发送消息~~~
2015-09-25 12:56:28,677 [jmsContainer-1] INFO [com.gxx.record.jms.ConsumerMessageListener:43] - 消息监听器接收内容结束===:发送消息~~~