日期:2014-05-16  浏览次数:20526 次

ActiveMQ基于derby数据库的spring整合

服务端代码:

package easyway.activemq.app.demo3;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.StreamMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
 * 消息的创建者
 * @author longgangbai
 *
 */
public class StreamMsgProducer {
	
	public static void main(String[] args) {
		ClassPathXmlApplicationContext ctx=new ClassPathXmlApplicationContext("activemq-jdbc.xml");
		ActiveMQConnectionFactory activeMqfactory=(ActiveMQConnectionFactory)ctx.getBean("connectionFactory");
		Connection conn = null;
		try {
			conn = activeMqfactory.createConnection();
			conn.start();
			Session session = conn.createSession(false,
					Session.AUTO_ACKNOWLEDGE);
			Destination queue = session.createQueue("streamMsg");
			MessageProducer producer = session.createProducer(queue);
             File file=new File("C:\\send.txt");
			InputStream in = new FileInputStream(file);
			byte[] buffer = new byte[2048];
			int c = -1;
			while ((c = in.read(buffer)) > 0) {
				StreamMessage smsg = session.createStreamMessage();
				smsg.writeBytes(buffer, 0, c);
				producer.send(smsg);
				System.out.println("send: " + c);
			}
			in.close();
		} catch (JMSException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			if (conn != null) {
				try {
					conn.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}
	


}

?

客户端代码:

package easyway.activemq.app.demo3;

import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.StreamMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
 * 消息的消费者
 * @author longgangbai
 *
 */
public class StreamMsgConsumer {
	public void receive() {
		
		ClassPathXmlApplicationContext ctx=new ClassPathXmlApplicationContext("activemq-jdbc.xml");
		
		
		ActiveMQConnectionFactory activeMqfactory=(ActiveMQConnectionFactory)ctx.getBean("connectionFactory");
		
		Connection conn = null;
		try {
			conn = activeMqfactory.createConnection();
			conn.start();
			Session session = conn.createSession(false,
					Session.AUTO_ACKNOWLEDGE);
			Destination queue = session.createQueue("streamMsg");
			MessageConsumer consumer = session.createConsumer(queue);

			OutputStream out = new FileOutputStream("c:\\receive.txt");
			byte[] buffer = new byte[2048];
			while (true) {
				Message msg = consumer.receive(5000);
				if (msg == null) {
					break;
				}

				if (msg instanceof StreamMessage) {
					StreamMessage smsg = (StreamMessage) msg;
					int c = smsg.readBytes(buffer);
					out.write(buffer, 0, c);
					System.out.println("Receive: " + c);
				}
			}
			out.close();
		} catch (JMSException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			if (conn != null) {
				try {
					conn.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}

	public static void main(String[] args) {
		new StreamMsgConsumer().receive();
	}
}

?

activemq的配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/