日期:2014-05-16 浏览次数:20491 次
????? JMS 规范支持两种类型的消息传递:持久和非持久的。对于持久的传递的消息必须被记录至稳定的存储去,
而对非持久的存储,JMS Provider必须尽力传递该消息,但是不会将该消息子记录至稳定的存储去的。
????? 持久的消息使用:当消息传递给Broker后,不管消息发送时Consumer是否运行,该消息对Consumer
始终是可用的。
????? 只有当消息被Consumer消费或确认时,它才会被从消息存储中删除。
????? 非持久的消息使用:用于发送通知或非即时的数据。
?
????? Storage for Queue: 消息存储是按照First in, First out(FIFO)的顺序存储的,One message is
dispatched to a? singleconsumer at a time. 只有当消息被消费或被确认时,它才会被从消息存储中删除。
????? For durable sunscribers to a topic: 每个Consumer获取一个消息的Copy。为了节省存储空间,每个
消息只有一个Copy被Broker存储。存储维护Durable Subscriber的指针,该指针指向下一个存储的消息,并
且分发该消息的Copy给它的Consumer. 因为在这种情况下,每个消息都有多个潜在消费者,所以消息只有当
被成功的传递给每个注册的Durabel Subscriber后,才会被删除。
?
1. KahaDB消息存储
??? 这是一种基于文件的消息存储,并且联合Journal事务,可稳定存储消息并且恢复消息。KahaDB消息存储使
用事务Log作为它的索引,并且对所有的Destination仅仅使用一个索引文件,该索引文件是事务日志文件组中
消息ID的索引。XML文件中配置KahaDB存储如
下:
<broker brokerName="broker" persistent="true" useShutdownHook="false"> <persistenceAdapter> <kahaDB directory="activemq-data" journalMaxFileLength="16mb" /> </persistenceAdapter> </broker>
???? Java语言实现配置:
public class EmbeddedBrokerUsingAMQStoreExample { BrokerService createEmbeddedBroker() throws Exception { BrokerService broker = new BrokerService(); File dataFileDir = new File("target/amq-in-action/kahadb"); KahaDBStore kaha = new KahaDBStore(); kaha.setDirectory(dataFileDir); // Using a bigger journal file kaha.setJournalMaxFileLength(1024 * 100); // small batch means more frequent and smaller writes kaha.setIndexWriteBatchSize(100); // do the index write in a separate thread kaha.setEnableIndexWriteAsync(true); broker.setPersistenceAdapter(kaha); // create a transport connector broker.addConnector("tcp://localhost:61616"); // start the broker broker.start(); return broker; } }
2. KahaDB消息存储原理
??? KahaDB消息存储联合使用快速的事务处理:Journal以及数据日志文件,该日志文件是消息ID的索引,并且
??? 在内存中缓存消息。
??? 参考:In KahaDB messages are stored in indexed log files and cached for performance.png
3. KahaDB消息存储目录结构
??? 包含以下文件及文件夹:db log files, archieve directory, db.data.db.redo.
4. 配置KahaDB消息存储
??? 参考:Manning ActiveMQ inAction.pdf(101-102)
?
?
????? AMQ消息存储,类似于KahaDB。它使用了journal事务来确保稳定的持久、恢复、高性能的索引。这种存
储方式主要用于大数据(消息)量的存储。对每个索引文件,有两个单独的文件;一个用于每个Destination。如
果每个Broker有上千个Queue,此时使用AMQ消息存储是不合适的,并且恢复数据也很慢,因为所有的索引文
件需要rebuid,需要Broker扫描所有的Data logs去再次构建索引。
?
1. AMQ消息存储
??? 与KahaDB类似,主要包括以下几部分:
??? (1) Data Logs: 作为消息journal.
??? (2) Cache: 在消息写入data log后,在内存中保持消息用于快速恢复。
??? (3) Reference Store:在journal保存消息的引用,按照消息ID索引。
2. AMQ消息目录结构
??? 包括:lock file,temp-storage directory, kr-store,journal directory, archive directory.
3. AMQ消息存储配置
<?xml version="1.0" encoding="UTF-8"?> <beans> <broker xmlns="http://activemq.apache.org/schema/core"> <persistenceAdapter> <amqPersistenceAdapter directory="target/Broker2-data/activemq-data" syncOnWrite="true" indexPageSize="16kb" indexMaxBinSize="100" maxFileLength="10mb" /> </persistenceAdapter> </broker> </beans>
???? 详细配置参考:Manning ActiveMQ inAction.pdf(105-106)