日期:2014-05-20  浏览次数:21252 次

JMS:Java消息服务(Java Message Service)初学笔记
JMS:Java消息服务(Java Message Service)
初学JMS,记录下心得
一什么是JMS,什么是activeMq?
1JMS是一种有关面向消息中间件的技术规范,读了李刚的经典java EE企业应用实战之后。我觉得可以理解为,为了解决RMI,CORBA,WebService等技术的同步通信,客户端和服务器端生命周期耦合等缺点的一种技术。它实现了完全解耦,它能让消息生产者和消息消费者之间的完全隔离。而这种隔离就需要面向消息服务器的支持,这个消息服务器就是activeMq和类似其他的服务器。此外,我们也可以理解JMS类似JDBC,他屏蔽了不同厂家的消息服务器。不管底层采用何种消息服务器。应用程序只要使用JMS提供的API即可,不需要了解底层的各个服务器的细节内容。
2.activeMq,引用百度百科的说法,要使用Java消息服务,你必须要有一个JMS提供者,管理会话和队列。这里我们理解为它就是JMS服务的提供者。JMS消息生产者和消费者都需要连接到这个JMS服务提供者。
3.注意JMS中的消息生产者,消费者和消息目的,我们可以把它看做是使用JMS服务提供者所提供的服务的客户端。所以就有了connection.setClientID("MyClient");当然从这句话的实际用途来看,他是为了完成可靠地消息传输的。消息目的,我们理解为消息生产者发送消息的目的地,也是消息消费者获取消息的消息源。我理解其就存在消息提供者

二 Java消息服务支持两种消息模型:Point-to-Point消息(P2P;即:点对点)和发布订阅消息(Publish Subscribe messaging,简称Pub/Sub;即:广播式)
1什么是P2P?引用百度百科的说法,P2P是英文Peer-to-Peer(对等)的简称,又被称为“点对点”。“对等”技术,是一种网络新技术,依赖网络中参与者的计算能力和带宽,而不是把依赖都聚集在较

少的几台服务器上。我的理解就是一台计算机并不是通过服务器来和其他计算机连接,而是直接和其他计算机连接。
2什么是Pub/Sub?在这种模型中,信息消费者和生产者参与消息的传输。生产者“发布”事件,而消费者“订阅”感兴趣的事件并消费事件。生产者把消息把与特定主题关联起来,然后消息传递系统根

据消费者所注册的感兴趣主题,把消息路由给消费者。也就是说这种就是广播式的模式。

或者我们也可以理解为,PTP是类似邮件系统,而Pub/Sub类似BBS,邮件系统不管消息消费者是否在线,只要消息还处于有效期内,该消息消费者总可以从消息队列中提取该消息,而Pub/Sub,如果消息消费者处于离线状态,就会错过消息,正像如果我们一段时间不登陆BBS,再登陆的话,之前的信息就不知道
三JMS它是使用JNDI的相应方法来实现其操作的。
1JNDI,一组在java应用中访问命名和目录服务的API,什么是目录服务,我的理解是它提供了一个服务名字和服务的具体网络物力地址的映射,有了这个映射,我们只需要通过名字就可以访问到具体的

服务。

四一个典型的JMS程序要经过以下步骤才能开始创建和使用消息
1、通过JNDI查询ConnectionFactory
2、用ConnectionFactory创建一个Connection
3、用Connection创建一个或多个Session
4、通过JNDI查询一个或多个Destination
5、用Session和Destination创建对应的MessageProducer和MessageConsumer
6、启动Connection

消息选择是一种选择机制,类似于SQL的查询条件。

五实例

1.当然我们必须先启动activeMq服务器,提供JMS服务。

2.发送消息例子:
package com.flvcd.servlet;
import java.io.IOException; 
import java.io.PrintWriter; 
import javax.jms.Connection; 
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode; 
import javax.jms.Destination; 
import javax.jms.JMSException;
import javax.jms.Message; 
import javax.jms.MessageListener; 
import javax.jms.MessageProducer;
import javax.jms.Session; 
import javax.naming.Context; 
import javax.naming.InitialContext; 
import javax.naming.NamingException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest; 
import javax.servlet.http.HttpServletResponse;
public class MyPublish extends HttpServlet implements MessageListener {
	private InitialContext initCtx; 
	private Context envContext; 
	private ConnectionFactory connectionFactory;  
	private Connection connection;  
	private Session jmsSession; 
	private MessageProducer producer;
	public void onMessage(Message message) {
		
	}
	public MyPublish(){
		super();
	}
	public void destroy(){
		super.destroy();
	}
	public void doGet(HttpServletRequest request, HttpServletResponse response)    throws ServletException, IOException {
		doPost(request, response);  
	}
	public void doPost(HttpServletRequest request, HttpServletResponse response)throws ServletException, IOException{
		String content=request.getParameter("content");   //设置持久方式   
		try {    
			producer.setDeliveryMode(DeliveryMode.PERSISTENT);   
			Message testMessage = jmsSession.createMessage();    // 发布刷新文章消息 
			testMessage.setStringProperty("RefreshArticleId", content);    
			producer.send(testMessage);    // 发布刷新帖子消息    
			testMessage.clearProperties();   
			testMessage.setStringProperty("RefreshThreadId", content);   
			producer.send(testMessage);   
			} catch (Exception e) {  
				e.printStackTrace();   
			}
		}
	public void init() throws ServletException {   // Put your code here  
		try {    
			initCtx = new InitialContext();  
			envContext = (Context) initCtx.lookup("java:comp/env");    
			connectionFactory = (ConnectionFactory) envContext.lookup("jms/NormalConnectionFactory");  
			connection = connectionFactory.createConnection();   
			jmsSession = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);    
			producer = jmsSession.createProducer((Destination) envContext.lookup("jms/topic/MyTopic"));    
			} catch (NamingException e) {  
				e.printStackTrace();  
				} catch (JMSException e) { 
					e.printStackTrace();
				}
	}
		
	
}



 

3.接收消息例子:

package com.flvcd.servlet;

import java.io.*;
import javax.servlet.*;
import javax.servlet.http.*;
import javax.naming.*;
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JMSListener extends HttpServlet implements MessageListener{
    /** 初始化jms连接,创建topic监听器 */
    public void init(ServletConfig config) throws ServletException{        
        try {
            InitialContext initCtx = new InitialContext();
            Context envContext = (Context) initCtx.lookup("java:comp/env");
            ConnectionFactory connectionFactory = (ConnectionFactory) envContext.lookup("jms/FailoverConnectionFactory");
            Connection connection = connectionFactory.createConnection();
            connection.setClientID("MyClient");
            Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
            //普通消息订阅者,无法接收持久消息
            //MessageConsumer consumer = jmsSession.createConsumer((Destination) envContext.lookup("jms/topic/MyTopic"));
            //基于Topic创建持久的消息订阅者,前提:Connection必须指定一个唯一的clientId,当前为MyClient
            TopicSubscriber consumer = jmsSession.createDurableSubscriber((Topic) envContext.lookup("jms/topic/MyTopic"), "MyClient");
            consumer.setMessageListener(this);
            connection.start();
        } catch (NamingException e) {
            e.printStackTrace();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
    /** 接收消息,做对应处理 */
    public void onMessage(Message message) {
        if (checkText(message, "RefreshArticleId") != null) {
            String articleId = checkText(message, "RefreshArticleId");
            System.out.println("接收刷新文章消息,开始刷新文章ID=" + articleId);
        } 
        else if (checkText(message, "RefreshThreadId") != null) {
            String threadId = checkText(message, "RefreshThreadId");
            System.out.println("接收刷新论坛帖子消息,开始刷新帖子ID=" + threadId);
        } else {
            System.out.println("接收普通消息,不做任何处理!");
        }
    }
    private static String checkText(Message m, String s) {
        try {
            return m.getStringProperty(s);
        } catch (JMSException e) {
            e.printStackTrace(System.out);
            return null;
        }
    }
}


注意这里是使用MessageListener,也就是使用异步接受的方式,接收程序在没有收到消息之前不会被阻塞。如果有JMS消息到达时,JMS消息监听器的监听方法onMessage()方法会被触发。同时,我们必须为消息消费者绑定消息监听器,这就引出了可靠地JMS订阅者。
consumer.setMessageListener(this);

4.配置web.xml
<servlet>  
 	 <servlet-name>jms-listener</servlet-name>  
  	<servlet-class>  com.flvcd.servlet.JMSListener</servlet-class>  
  	<load-on-startup>1</load-on-startup>  
  </servlet>


5.此外还有修改context.xml以JNDI的方式定义了ActiveMQ的broker连接url、Topic和Queue。和修改activemq.xml等内容
详见http://wenku.baidu.com/view/1cab03d380eb6294dd886cbc.html
六编程时报的错误处理
关于jndi报的一些错误
错误一:
引用
Need to specify class name in environment or system property, or as an applet parameter, or in an application resource file:  java.naming.factory.initial
解决方法:配置properties
  Properties properties = new Properties(); 
            properties.setProperty(Context.INITIAL_CONTEXT_FACTORY,"org.jnp.interfaces.NamingContextFactory"); 
            properties.put(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces"); 
            properties.setProperty(Context.PROVIDER_URL, "localhost:1099");  
关于jndi的写法wangh8080.blog.163.com/blog/static/197848297201241644436227/
错误二:
引用
cannot instantiate class: org.jnp.interfaces.NamingContextFactory [Root exception is java.lang.ClassNotFoundException: org.jnp.interfaces.NamingContextFactory]
 
解决方法:引入jbossall-client.jar包
错误三:
引用
Could not obtain connection to any of these urls: localhost:1099
解决方法:eclipse集成jboss,
什么是JBoss?它是一个基于J2EE的开放源代码的应用服务器。
详情参见http://www.cnblogs.com/FIX-GPS/archive/2012/06/27/2566370.html
当然也可以使用http://wenku.baidu.com/view/1cab03d380eb6294dd886cbc.html来编写自己的JMS实例