你的位置:首页 > Java教程

[Java教程]JMS 简介


 

JMS (Java Message Service)。用于在Application之间进行message传递。

 

Enterprise Message

         WebSphere MQ、SonicMQ、Microsoft Message Queue(MSMQ)、ActiveMQ、EJB中的Message Bean、SOA中的ESB(Enterprise Message Bus)等都是Enterprise Message。

 

使用Enterprise Message时,可以有下列方式:

集中式

 

分布式

 

 

 

混合式

在每个Client上都安装一个Daemon程序,Client与Daemon使用TCP通信,Daemon与其它Daemon之间使用MultiCast。

 

 

JMS消息模型

 

         JMS支持两种消息模型:point-to-point、publish-and-subscribe。

 

 

  

point-to-point

 

 

 

 

在P2P模型中,一个JMS Client可以发送、接收消息到(从)一个queue。可以是同步方式,也可以是异步方式。消息以PULL(拉)的方式获取。发送方称为sender,接收方称为receiver。

 

publish-and-subscribe

        

 

 

消息以push方式发送。即使订阅者是离线状态,也会进行发送。发送方称为publisher,接收方称为subscriber。

 

JMS API 简述

         JMS 只是定义了接口,并没有具体的实现,实现都是由相关的厂商来完成的。这一点类似于JDBC。

         JMS的API可以分为3个主要部分:1)通用API, 2)p2p API,3)pub/sub API。

几个主要的接口是:

·ConnectionFactory

·Destination

·Connection

·Session

·Message

·MessageProducer

·MessageConsumer

 

         在上述接口中,ConnectionFactory、Destination是必须通过JNDI来取得,其实的接口都是根据这两个来创建的。取得ConnectionFactory后,就可以取得Connection;有了Connection就可以创建Session;有了Session,就可以创建出Message、MessageProducer、MessageConsumer。

         实例的创建关系如下:

 

 

         在JMS中,Session对象会持有transactional,而不是Connection,这一点与JDBC是不一样的。也就是说,在使用JMS时,一个应用只会创建一个Connection对象,可以根据这个Connection对象创建出多个Session对象。

 

 

 

Point-to-point

 

 

 

 

 

这是为P2P设计的接口。与通用接口一一对应。

 

Publish/Subscribe

 

 

与此类似,也为pub/sub专门设计了接口:

 

 

 

JMS Pub/Sub示例

 

要运行JMS,需要有JMS代理服务器。下面这个例子中,会使用activemq作为代理服务器。

在安装完成activemq后,启动它。

         示例程序是一个简易Chat Room。

 

package com.fjn.java.jms.topic;import java.io.IOException;import java.util.Properties;import java.util.Scanner;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.Session;import javax.jms.TextMessage;import javax.jms.Topic;import javax.jms.TopicConnection;import javax.jms.TopicConnectionFactory;import javax.jms.TopicPublisher;import javax.jms.TopicSession;import javax.jms.TopicSubscriber;import javax.naming.InitialContext;import javax.naming.NamingException;public class Chat implements MessageListener {  private TopicSession pubSession;  private TopicPublisher publisher;  private TopicConnection connection;  private String username;  public Chat(String factoryJdni, String topicName, String username)      throws NamingException, JMSException, IOException {    Properties props=new Properties();    props.load(Chat.class.getResourceAsStream("jndi.properties"));    InitialContext ctx = new InitialContext(props);    TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx        .lookup(factoryJdni);    Topic topic = (Topic) ctx.lookup(topicName);    TopicConnection conn = connFactory.createTopicConnection();    TopicSession pubSession = conn.createTopicSession(false,        Session.AUTO_ACKNOWLEDGE);    TopicSession subSession = conn.createTopicSession(false,        Session.AUTO_ACKNOWLEDGE);    TopicPublisher publisher = pubSession.createPublisher(topic);    TopicSubscriber subscriber = subSession.createSubscriber(topic, null,        true);    subscriber.setMessageListener(this);    this.connection = conn;    this.publisher = publisher;    this.pubSession = pubSession;    this.username = username;    conn.start();  }  @Override  public void onMessage(Message msg) {    try {      TextMessage text = (TextMessage) msg;      System.out.println(text.getText());    } catch (JMSException e) {      e.printStackTrace();    }  }  protected void writeMessage(String text) throws JMSException{    TextMessage msg=pubSession.createTextMessage();    msg.setText(username+": "+text);    publisher.publish(msg);  }    public void close() throws JMSException{    connection.close();  }    public static void main(String[] args) throws NamingException, JMSException, IOException {    Chat chat=new Chat("TopicCF","topic1","ZhangSan");    Scanner scanner=new Scanner(System.in);    while(true){      String line=scanner.nextLine();      if("exit".equals(line)){        chat.close();        System.exit(0);        scanner.close();      }else{        chat.writeMessage(line);      }    }  }}package com.fjn.java.jms.topic;import java.io.IOException;import java.util.Scanner;import javax.jms.JMSException;import javax.naming.NamingException;public class Chat2 {  public static void main(String[] args) throws NamingException, JMSException, IOException {    Chat chat=new Chat("TopicCF","topic1","LiSi");    Scanner scanner=new Scanner(System.in);    while(true){      String line=scanner.nextLine();      if("exit".equals(line)){        chat.close();        System.exit(0);        scanner.close();      }else{        chat.writeMessage(line);      }    }  }}

 

其实,你可以将程序的main方法稍作改动,让创建Chat的三个参数从命名行取得。这样,就可以启动任意客户端了。

 

Jndi.properties文件内容:

java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactoryjava.naming.provider.url = tcp://localhost:61616java.naming.security.principal=systemjava.naming.security.credentials=managerconnectionFactoryNames = TopicCFtopic.topic1 = jms.topic1

 


这样就可以进行多个客户端对话了,也就是群组对话了。