你的位置:首页 > Java教程

[Java教程]Java ActiveMQ 讲解(二)Spring ActiveMQ整合+注解消息监听


对于ActiveMQ消息的发送,原声的api操作繁琐,而且如果不进行二次封装,打开关闭会话以及各种创建操作也是够够的了。那么,Spring提供了一个很方便的去收发消息的框架,spring jms。整合Spring后,代码不仅变得非常优雅,而且易用性和扩展性更好。

废话不多说,直接开搞。

 

1. maven依赖

    <!-- activemq -->    <dependency>      <groupId>org.apache.xbean</groupId>      <artifactId>xbean-spring</artifactId>      <version>3.16</version>    </dependency>    <dependency>      <groupId>org.springframework</groupId>      <artifactId>spring-jms</artifactId>      <version>${springframework.version}</version>    </dependency>    <dependency>      <groupId>org.apache.activemq</groupId>      <artifactId>activemq-all</artifactId>      <version>${activemq.version}</version>    </dependency>

 

2.命名空间引入

<??><!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ --><beans ="http://www.springframework.org/schema/beans" ="http://www.springframework.org/schema/context" ="http://www.w3.org/2001/ ="http://activemq.apache.org/schema/core"  ="http://www.springframework.org/schema/jms"  xsi:schemaLocation="http://www.springframework.org/schema/beans      http://www.springframework.org/schema/beans/spring-beans-3.2.xsd    http://www.springframework.org/schema/context      http://www.springframework.org/schema/context/spring-context-3.2.xsd    http://www.springframework.org/schema/jms    http://www.springframework.org/schema/jms/spring-jms-4.1.xsd    http://activemq.apache.org/schema/core    http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">

 

3.
  <amq:connectionFactory id="jmsConnectionFactory" brokerURL="tcp://${activemq.ip}:61616" userName="${activemq.username}" password="${activemq.password}" />  <bean id="jmsConnectionFactoryExtend" class="org.springframework.jms.connection.CachingConnectionFactory">    <constructor-arg ref="jmsConnectionFactory" />    <property name="sessionCacheSize" value="100" />  </bean>  <!-- 消息处理器 -->  <bean id="jmsMessageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter" />  <!-- ====Producer side start==== -->  <!-- 定义JmsTemplate的Queue类型 -->  <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">    <constructor-arg ref="jmsConnectionFactoryExtend" />    <!-- 非pub/sub模型(发布/订阅),即队列模式 -->    <property name="pubSubDomain" value="false" />    <property name="messageConverter" ref="jmsMessageConverter"></property>  </bean>  <!-- 定义JmsTemplate的Topic类型 -->  <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">    <constructor-arg ref="jmsConnectionFactoryExtend" />    <!-- pub/sub模型(发布/订阅) -->    <property name="pubSubDomain" value="true" />    <property name="messageConverter" ref="jmsMessageConverter"></property>  </bean>  <jms:listener-container destination-type="queue" container-type="default" connection-factory="jmsConnectionFactoryExtend" acknowledge="auto" concurrency="5-10">    <jms:listener destination="testqueue" ref="queueReciver" />  </jms:listener-container>  

第一个是配置我们的mq连接,ip+端口号,帐号密码的信息。

第二个是引入spring的mq连接池。可以配置缓存的连接数。

第三个是消息处理器,Spring默认提供了基于Jdk Serializable的消息处理和MappingJackson2MessageConventer,其实这两个挺常用,在Spring Redis中,在Spring MVC中,都有着这几种conventer的身影。

下面是两个发送消息的模版类,类似于之前讲过的RedisTemplate。向其注入上面定义的消息处理器,代码中我们会用到。(其实类中已经判断如果不进行注入就设置一个默认的,但是自己注入的话,方便我们控制)

 listener-container是Spring提供的一个监听器容器,用于统一控制我们的监听类来接收处理消息。这里面有一些配置,schema有说明。可以配置响应模式,消费者数量等。开启多消费者,有助于加快队列处理速度。

 

4.注解方式的实现

如果要用注解的方式,就不需要在

<bean id="jmsListenerContainerFactory" class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">    <property name="connectionFactory" ref="jmsConnectionFactoryExtend"/>  </bean>    <!-- 监听注解支持 -->  <jms:annotation-driven/>

这样,配置我们消费处理类上的@listener注解,即可监听对应的queue或者topic消息。

 

5.生产者代码

队列消息:

@Resource@Component("queueSender")public class QueueSender {  @Resource(name = "jmsQueueTemplate")  private JmsTemplate jmsQueueTemplate;// 通过@Qualifier修饰符来注入对应的bean    public void send(String destination, final Object message) {    jmsQueueTemplate.send(destination, new MessageCreator() {      @Override      public Message createMessage(Session session) throws JMSException {        return jmsQueueTemplate.getMessageConverter().toMessage(message, session);      }    });  }}

 

 

订阅消息:

@Componentpublic class TopicSender {    @Resource(name="jmsTopicTemplate")  private JmsTemplate jmsTemplate;      /**   * 发送一条消息到指定的队列(目标)   * @param queueName 队列名称   * @param message 消息内容   */  public void publish(String destination,final Object message){    jmsTemplate.send(destination, new MessageCreator() {      @Override      public Message createMessage(Session session) throws JMSException {        return jmsTemplate.getMessageConverter().toMessage(message, session);      }    });  }}

 

 

6.消费者代码

package cn.test.activemq.consumer.queue;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.Session;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.jms.annotation.JmsListener;import org.springframework.jms.listener.adapter.MessageListenerAdapter;import org.springframework.jms.support.converter.MessageConversionException;import org.springframework.stereotype.Component;import cn.test.MqBean;import cn.test.activemq.message.types.QueueDefination;/** * @author Han */@Component("spqueueconsumertest")public class SpringQueueReciverTest extends MessageListenerAdapter{  private static final Logger log = LoggerFactory.getLogger(SpringQueueReciverTest.class);        @JmsListener(destination=QueueDefination.TEST_QUEUE,concurrency="5-10")  public void onMessagehehe(Message message, Session session) throws JMSException {    try {      MqBean bean = (MqBean) getMessageConverter().fromMessage(message);      System.out.println(bean.getName());      System.out.println(session);      message.acknowledge();      message.acknowledge();    } catch (MessageConversionException | JMSException e) {      e.printStackTrace();    }      }  }

上面的@JmsListener(destination=QueueDefination.TEST_QUEUE,concurrency="5-10")是在用注解方式监听的时候加入。如果用

 

附上MqBean

public class MqBean implements Serializable{  private Integer age;  private String name;  public Integer getAge() {    return age;  }  public void setAge(Integer age) {    this.age = age;  }  public String getName() {    return name;  }  public void setName(String name) {    this.name = name;  }  }

 

 

运行效果截图:

 

 

ActiveMQ的基本用法大概就这些了。后续如果有新的发现包括优化发面的,再继续发吧。