日期:2014-05-16 浏览次数:20526 次
服务端代码:
package easyway.activemq.app.demo3; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.StreamMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * 消息的创建者 * @author longgangbai * */ public class StreamMsgProducer { public static void main(String[] args) { ClassPathXmlApplicationContext ctx=new ClassPathXmlApplicationContext("activemq-jdbc.xml"); ActiveMQConnectionFactory activeMqfactory=(ActiveMQConnectionFactory)ctx.getBean("connectionFactory"); Connection conn = null; try { conn = activeMqfactory.createConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination queue = session.createQueue("streamMsg"); MessageProducer producer = session.createProducer(queue); File file=new File("C:\\send.txt"); InputStream in = new FileInputStream(file); byte[] buffer = new byte[2048]; int c = -1; while ((c = in.read(buffer)) > 0) { StreamMessage smsg = session.createStreamMessage(); smsg.writeBytes(buffer, 0, c); producer.send(smsg); System.out.println("send: " + c); } in.close(); } catch (JMSException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { if (conn != null) { try { conn.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
?
客户端代码:
package easyway.activemq.app.demo3; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.StreamMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.xbean.BrokerFactoryBean; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * 消息的消费者 * @author longgangbai * */ public class StreamMsgConsumer { public void receive() { ClassPathXmlApplicationContext ctx=new ClassPathXmlApplicationContext("activemq-jdbc.xml"); ActiveMQConnectionFactory activeMqfactory=(ActiveMQConnectionFactory)ctx.getBean("connectionFactory"); Connection conn = null; try { conn = activeMqfactory.createConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination queue = session.createQueue("streamMsg"); MessageConsumer consumer = session.createConsumer(queue); OutputStream out = new FileOutputStream("c:\\receive.txt"); byte[] buffer = new byte[2048]; while (true) { Message msg = consumer.receive(5000); if (msg == null) { break; } if (msg instanceof StreamMessage) { StreamMessage smsg = (StreamMessage) msg; int c = smsg.readBytes(buffer); out.write(buffer, 0, c); System.out.println("Receive: " + c); } } out.close(); } catch (JMSException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { if (conn != null) { try { conn.close(); } catch (JMSException e) { e.printStackTrace(); } } } } public static void main(String[] args) { new StreamMsgConsumer().receive(); } }
?
activemq的配置如下:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/