日期:2014-05-16 浏览次数:20506 次
本文主要目的实现activemq和spring将消息写入数据库的方法:
activemq.xml的内容如下:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <!-- Allows us to use system properties as variables in this configuration file --> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/> <broker useJmx="false" brokerName="jdbcBroker" xmlns="http://activemq.apache.org/schema/core"> <persistenceAdapter> <jdbcPersistenceAdapter dataDirectory="activemq-data" dataSource="#derby-ds"/> </persistenceAdapter> <transportConnectors> <transportConnector name="default" uri="tcp://localhost:61619"/> </transportConnectors> </broker> <bean id="derbyds" class="org.apache.activemq.store.jdbc.adapter.DB2JDBCAdapter"/> <!-- Embedded Derby DataSource Sample Setup --> <bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource"> <property name="databaseName" value="derbydb"/> <property name="createDatabase" value="create"/> </bean> </beans>
?
activemq-jdbc.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean"> <property name="config" value="classpath:activemq.xml"/> <property name="start" value="true"/> </bean> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" depends-on="broker"> <property name="brokerURL" value="tcp://localhost:61619"/> </bean> </beans>
?
消息生产者:
package easyway.activemq.app.demo3; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.StreamMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * 消息的创建者 * @author longgangbai * */ public class StreamMsgProducer { public static void main(String[] args) { ClassPathXmlApplicationContext ctx=new ClassPathXmlApplicationContext("activemq-jdbc.xml"); ActiveMQConnectionFactory activeMqfactory=(ActiveMQConnectionFactory)ctx.getBean("connectionFactory"); Connection conn = null; try { conn = activeMqfactory.createConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination queue = session.createQueue("streamMsg"); MessageProducer producer = session.createProducer(queue); File file=new File("C:\\send.txt"); InputStream in = new FileInputStream(file); byte[] buffer = new byte[2048]; int c = -1; while ((c = in.read(buffer)) > 0) { StreamMessage smsg = session.createStreamMessage(); smsg.writeBytes(buffer, 0, c); producer.send(smsg); System.out.println("send: " + c); } in.close(); } catch (JMSException e) { e.printStackTrace(); } catch (IOException e