日期:2014-05-16 浏览次数:20533 次
本人实现的功能为activemq将消息持久化到数据库的方法:
消息消费者的事先代码:
package easyway.activemq.app; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; /*** * 消息持久化到数据库 * @author longgangbai */ public class MessageCustomer { private static Logger logger=LogManager.getLogger(MessageProductor.class); private String username=ActiveMQConnectionFactory.DEFAULT_USER; private String password=ActiveMQConnectionFactory.DEFAULT_PASSWORD; private String url=ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; private static String QUEUENAME="ActiveMQ.QUEUE"; protected static final int messagesExpected = 10; protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( url+"?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries="+messagesExpected); /*** * 创建Broker服务对象 * @return * @throws Exception */ public BrokerService createBroker()throws Exception{ BrokerService broker=new BrokerService(); broker.addConnector(url); return broker; } /** * 启动BrokerService进程 * @throws Exception */ public void init() throws Exception{ BrokerService brokerService=createBroker(); brokerService.start(); } /** * 接收的信息 * @return * @throws Exception */ public int receiveMessage() throws Exception{ Connection connection=connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); return receiveMessages(messagesExpected,session); } /** * 接受信息的方法 * @param messagesExpected * @param session * @return * @throws Exception */ protected int receiveMessages(int messagesExpected, Session session) throws Exception { int messagesReceived = 0; for (int i=0; i<messagesExpected; i++) { Destination destination = session.createQueue(QUEUENAME); MessageConsumer consumer = session.createConsumer(destination); Message message = null; try { logger.debug("Receiving message " + (messagesReceived+1) + " of " + messagesExpected); message = consumer.receive(2000); logger.info("Received : " + message); if (message != null) { session.commit(); messagesReceived++; } } catch (Exception e) { logger.debug("Caught exception " + e); session.rollback(); } finally { if (consumer != null) { consumer.close(); } } } return messagesReceived; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } }
?消息生产者的代码:
package easyway.activemq.app; import java.io.File; import java.util.Properties; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.sql.DataSource; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.store.jdbc.adapter.MySqlJDBCAdapter; import org.apache.commons.dbcp.BasicDataSourceFactory; import org.apache.log4j.LogManager; import org.apache