日期:2014-05-16 浏览次数:20796 次
本人实现的功能为activemq将消息持久化到数据库的方法:
消息消费者的事先代码:
package easyway.activemq.app;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
/***
* 消息持久化到数据库
* @author longgangbai
*/
public class MessageCustomer {
private static Logger logger=LogManager.getLogger(MessageProductor.class);
private String username=ActiveMQConnectionFactory.DEFAULT_USER;
private String password=ActiveMQConnectionFactory.DEFAULT_PASSWORD;
private String url=ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
private static String QUEUENAME="ActiveMQ.QUEUE";
protected static final int messagesExpected = 10;
protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
url+"?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries="+messagesExpected);
/***
* 创建Broker服务对象
* @return
* @throws Exception
*/
public BrokerService createBroker()throws Exception{
BrokerService broker=new BrokerService();
broker.addConnector(url);
return broker;
}
/**
* 启动BrokerService进程
* @throws Exception
*/
public void init() throws Exception{
BrokerService brokerService=createBroker();
brokerService.start();
}
/**
* 接收的信息
* @return
* @throws Exception
*/
public int receiveMessage() throws Exception{
Connection connection=connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
return receiveMessages(messagesExpected,session);
}
/**
* 接受信息的方法
* @param messagesExpected
* @param session
* @return
* @throws Exception
*/
protected int receiveMessages(int messagesExpected, Session session) throws Exception {
int messagesReceived = 0;
for (int i=0; i<messagesExpected; i++) {
Destination destination = session.createQueue(QUEUENAME);
MessageConsumer consumer = session.createConsumer(destination);
Message message = null;
try {
logger.debug("Receiving message " + (messagesReceived+1) + " of " + messagesExpected);
message = consumer.receive(2000);
logger.info("Received : " + message);
if (message != null) {
session.commit();
messagesReceived++;
}
} catch (Exception e) {
logger.debug("Caught exception " + e);
session.rollback();
} finally {
if (consumer != null) {
consumer.close();
}
}
}
return messagesReceived;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
}
?消息生产者的代码:
package easyway.activemq.app;
import java.io.File;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.sql.DataSource;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.jdbc.adapter.MySqlJDBCAdapter;
import org.apache.commons.dbcp.BasicDataSourceFactory;
import org.apache.log4j.LogManager;
import org.apache