用户工具

站点工具


分享:技术:jms:spring整合activemq

spring整合activemq

加入pom依赖

pom.xml
<!-- apache common -->
<dependency>
	<groupId>commons-pool</groupId>
	<artifactId>commons-pool</artifactId>
	<version>1.6</version>
</dependency>
<!-- jms -->
<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-jms</artifactId>
	<version>4.0.5.RELEASE</version>
</dependency>
<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-core</artifactId>
	<version>5.7.0</version>
</dependency>
<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-pool</artifactId>
	<version>5.7.0</version>
</dependency>

application-context.xml

application-context.xml
	<!-- 数据库配置文件位置 -->
	<context:property-placeholder location="classpath:/jdbc.properties,classpath:/redis.properties,classpath:/memcached.properties,classpath:/mongodb.properties,classpath:/activemq.properties" />

application-jms-activemq.xml

application-jms-activemq.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:p="http://www.springframework.org/schema/p"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
	xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:jpa="http://www.springframework.org/schema/data/jpa"
	xmlns:security="http://www.springframework.org/schema/security"
	xsi:schemaLocation="
		http://www.springframework.org/schema/beans 
		http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">
 
	<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
	<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="${activemq.server.url}"/>
	</bean>
 
	<!-- 连接池 -->
	<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
		<property name="connectionFactory" ref="targetConnectionFactory"/>
		<property name="maxConnections" value="10"/>
	</bean>
 
	<!-- 连接工厂 -->
	<bean id="jmsConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
		<property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
	</bean>
 
	<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
		<property name="connectionFactory" ref="jmsConnectionFactory"/>
	</bean>
 
	<!--这个是队列目的地,点对点(Point-to-Point)-->
	<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg>
			<value>RECORD_QUEUE</value>
		</constructor-arg>
	</bean>
 
	<!--这个是主题目的地,发布/订阅(Publish/Subscribe)-->
	<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
		<constructor-arg value="RECORD_TOPIC"/>
	</bean>
 
	<!-- 消息监听器 -->
	<!-- 如果多个监听器监听点对点(Point-to-Point)目的地,根据日志可以看出,只有一个监听器会监听到消息 -->
	<!-- 如果多个监听器监听发布/订阅(Publish/Subscribe)目的地,根据日志可以看出,多个监听器都会监听到消息 -->
	<!-- 当然一个应用针对一个目的地一般只会配置一个监听器 -->
	<bean id="consumerMessageListener" class="com.gxx.record.jms.ConsumerMessageListener"/>
	<!-- 消息监听容器 -->
	<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="jmsConnectionFactory" />
		<property name="destination" ref="queueDestination" /><!-- 指定目的地 -->
		<property name="messageListener" ref="consumerMessageListener" />
	</bean>
 
	<!-- 配置多个监听器 -->
	<!-- 消息监听器 -->
	<!-- <bean id="consumerMessageListener_2" class="com.gxx.record.jms.ConsumerMessageListener"/> -->
	<!-- 消息监听容器 -->
	<!-- <bean id="jmsContainer_2" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="jmsConnectionFactory" />
		<property name="destination" ref="queueDestination" />
		<property name="messageListener" ref="consumerMessageListener_2" />
	</bean> -->
 
</beans>

activemq.properties

activemq.properties
# activemq settings
activemq.server.url=tcp://121.43.104.34:61616

JmsService.java

JmsService.java
package com.gxx.record.service;
 
import javax.jms.Destination;
 
 
/**
 * <dl>
 *    <dt><b>Title:</b></dt>
 *    <dd>
 *    	JMS服务接口
 *    </dd>
 *    <dt><b>Description:</b></dt>
 *    <dd>
 *    	<p>none
 *    </dd>
 * </dl>
 *
 * @author Administrator
 * @version 1.0, 2015年9月24日
 * @since record
 *
 */
public interface JmsService {
	/**
	 * 想默认目的地发送消息
	 * @param message
	 */
	public void sendMessage(final String message);
 
	/**
	 * 指定目的地发送消息
	 * @param destination
	 * @param message
	 */
	public void sendMessage(final Destination destination, final String message);
}

JmsServiceImpl.java

JmsServiceImpl.java
package com.gxx.record.service.impl;
 
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
 
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
 
import com.gxx.record.service.JmsService;
 
/**
 * <dl>
 *	<dt><b>Title:</b></dt>
 *	<dd>
 *		用户服务实现类
 *	</dd>
 *	<dt><b>Description:</b></dt>
 *	<dd>
 *		<p>none
 *	</dd>
 * </dl>
 *
 * @author Administrator
 * @version 1.0, 2015年6月18日
 * @since record
 *
 */
@Service("jmsService")
public class JmsServiceImpl implements JmsService {
 
	/**
	 * 日志处理器
	 */
	Logger logger = Logger.getLogger(JmsServiceImpl.class);
 
	@Autowired
	private JmsTemplate jmsTemplate;
 
	/**
	 * 点对点(Point-to-Point)目的地
	 */
	@Autowired
	Destination queueDestination;
 
	/**
	 * 发布/订阅(Publish/Subscribe)目的地
	 */
	@Autowired
	Destination topicDestination;
 
	@Override
	/**
	 * 想默认目的地发送消息
	 * @param message
	 */
	public void sendMessage(final String message) {
		logger.info("生产者向默认目的地发了一个消息:" + message);
		jmsTemplate.send(queueDestination, new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage(message);
			}
		});
	}
 
	@Override
	/**
	 * 指定目的地发送消息
	 * @param destination
	 * @param message
	 */
	public void sendMessage(final Destination destination, final String message){
		logger.info("生产者向指定目的地发了一个消息:" + message);
		jmsTemplate.send(destination, new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage(message);
			}
		});
	}
}

ConsumerMessageListener.java

ConsumerMessageListener.java
package com.gxx.record.jms;
 
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
 
import org.apache.log4j.Logger;
 
/**
 * 
 * <dl>
 *    <dt><b>Title:</b></dt>
 *    <dd>
 *    	消息监听器
 *    </dd>
 *    <dt><b>Description:</b></dt>
 *    <dd>
 *    	<p>none
 *    </dd>
 * </dl>
 *
 * @author Administrator
 * @version 1.0, 2015年9月24日
 * @since record
 *
 */
public class ConsumerMessageListener implements MessageListener {
 
	/**
	 * 日志处理器
	 */
	Logger logger = Logger.getLogger(ConsumerMessageListener.class);
 
	@Override
	public void onMessage(Message message) {
		//这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换
		TextMessage textMsg = (TextMessage) message;
		try {
			logger.info("消息监听器接收内容:" + textMsg.getText());
		} catch (JMSException e) {
			logger.error("消息监听器接收内容异常发生!", e);
		}
	}
 
}

JmsController.java

JmsController.java
package com.gxx.record.web.jms;
 
import java.util.HashMap;
import java.util.Map;
 
import javax.servlet.http.HttpServletRequest;
 
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
 
import com.gxx.record.service.JmsService;
 
/**
 * UserController
 * 
 * @author gxx
 */
@Controller
@RequestMapping("/jms/")
public class JmsController {
	/**
	 * 日志处理器
	 */
	private final Logger logger = Logger.getLogger(JmsController.class);
 
	@Autowired
	private JmsService jmsService;
 
	/**
	 * 发送消息
	 * @param request
	 * @return
	 */
	@RequestMapping(value = "/send",produces="application/json")
	public @ResponseBody Map<String, Object> registJsp(HttpServletRequest request) {
		String content = request.getParameter("content");
		logger.info("发送消息:内容=[" + content + "]");
		jmsService.sendMessage(content);
		Map<String, Object> resultMap = new HashMap<String, Object>();
		resultMap.put("success", true);
		return resultMap;
	}
}

发送消息

请求地址Controller:http://ip:port/record/jms/send.htm?content=hello baby~~~,日志打出如下:

2015-09-24 16:25:08,920 [http-80-exec-7] INFO  [com.gxx.record.web.jms.JmsController:40] - 发送消息:内容=[hello baby~~~]
2015-09-24 16:25:08,921 [http-80-exec-7] INFO  [com.gxx.record.service.impl.JmsServiceImpl:49] - 生产者发了一个消息:hello baby~~~
2015-09-24 16:25:09,100 [jmsContainer-1] INFO  [com.gxx.record.jms.ConsumerMessageListener:40] - 消息监听器接收内容:hello baby~~~

配置多个监听器

如果监听目的地为点对点(Point-to-Point),则只会有一个监听器监听到,日志如下:

2015-09-24 16:25:08,920 [http-80-exec-7] INFO  [com.gxx.record.web.jms.JmsController:40] - 发送消息:内容=[hello baby~~~]
2015-09-24 16:25:08,921 [http-80-exec-7] INFO  [com.gxx.record.service.impl.JmsServiceImpl:49] - 生产者发了一个消息:hello baby~~~
2015-09-24 16:25:09,100 [jmsContainer-1] INFO  [com.gxx.record.jms.ConsumerMessageListener:40] - 消息监听器接收内容:hello baby~~~

如果监听目的地为发布/订阅(Publish/Subscribe),则多个监听器都会监听到,日志如下:

2015-09-24 16:32:17,791 [http-80-exec-3] INFO  [com.gxx.record.web.jms.JmsController:40] - 发送消息:内容=[hello baby~~~]
2015-09-24 16:32:17,791 [http-80-exec-3] INFO  [com.gxx.record.service.impl.JmsServiceImpl:52] - 生产者向默认目的地发了一个消息:hello baby~~~
2015-09-24 16:32:17,922 [jmsContainer_2-1] INFO  [com.gxx.record.jms.ConsumerMessageListener:40] - 消息监听器接收内容:hello baby~~~
2015-09-24 16:32:17,922 [jmsContainer-1] INFO  [com.gxx.record.jms.ConsumerMessageListener:40] - 消息监听器接收内容:hello baby~~~

顺序读取消息

修改代码,使得处理一条消息需要睡眠3秒钟,观察日志,显示每条消息依次被处理,不会一时间涌入,造成系统压力。

ConsumerMessageListener.java
package com.gxx.record.jms;
 
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
 
import org.apache.log4j.Logger;
 
/**
 * 
 * <dl>
 *    <dt><b>Title:</b></dt>
 *    <dd>
 *    	消息监听器
 *    </dd>
 *    <dt><b>Description:</b></dt>
 *    <dd>
 *    	<p>none
 *    </dd>
 * </dl>
 *
 * @author Administrator
 * @version 1.0, 2015年9月24日
 * @since record
 *
 */
public class ConsumerMessageListener implements MessageListener {
 
	/**
	 * 日志处理器
	 */
	Logger logger = Logger.getLogger(ConsumerMessageListener.class);
 
	@Override
	public void onMessage(Message message) {
		//这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换
		TextMessage textMsg = (TextMessage) message;
		try {
			logger.info("消息监听器接收内容开始===:" + textMsg.getText());
			Thread.currentThread();
			Thread.sleep(3000);
			logger.info("消息监听器接收内容结束===:" + textMsg.getText());
		} catch (JMSException e) {
			logger.error("消息监听器接收内容异常发生!", e);
		} catch (Exception e) {
			logger.error("睡眠3秒,异常发生!");
		}
	}
 
}
2015-09-25 12:56:19,675 [jmsContainer-1] INFO  [com.gxx.record.jms.ConsumerMessageListener:40] - 消息监听器接收内容开始===:发送消息~~~
2015-09-25 12:56:22,676 [jmsContainer-1] INFO  [com.gxx.record.jms.ConsumerMessageListener:43] - 消息监听器接收内容结束===:发送消息~~~
2015-09-25 12:56:22,676 [jmsContainer-1] INFO  [com.gxx.record.jms.ConsumerMessageListener:40] - 消息监听器接收内容开始===:发送消息~~~
2015-09-25 12:56:25,676 [jmsContainer-1] INFO  [com.gxx.record.jms.ConsumerMessageListener:43] - 消息监听器接收内容结束===:发送消息~~~
2015-09-25 12:56:25,676 [jmsContainer-1] INFO  [com.gxx.record.jms.ConsumerMessageListener:40] - 消息监听器接收内容开始===:发送消息~~~
2015-09-25 12:56:28,677 [jmsContainer-1] INFO  [com.gxx.record.jms.ConsumerMessageListener:43] - 消息监听器接收内容结束===:发送消息~~~
分享/技术/jms/spring整合activemq.txt · 最后更改: 2015/09/25 13:09 由 gxx