日期:2014-05-16  浏览次数:20491 次

ActiveMQ消息存储-KahaDB消息存储和AMQ消息存储

????? JMS 规范支持两种类型的消息传递:持久和非持久的。对于持久的传递的消息必须被记录至稳定的存储去,

而对非持久的存储,JMS Provider必须尽力传递该消息,但是不会将该消息子记录至稳定的存储去的。

????? 持久的消息使用:当消息传递给Broker后,不管消息发送时Consumer是否运行,该消息对Consumer

始终是可用的。

????? 只有当消息被Consumer消费或确认时,它才会被从消息存储中删除。

????? 非持久的消息使用:用于发送通知或非即时的数据。

?

????? Storage for Queue: 消息存储是按照First in, First out(FIFO)的顺序存储的,One message is

dispatched to a? singleconsumer at a time. 只有当消息被消费或被确认时,它才会被从消息存储中删除。

????? For durable sunscribers to a topic: 每个Consumer获取一个消息的Copy。为了节省存储空间,每个

消息只有一个Copy被Broker存储。存储维护Durable Subscriber的指针,该指针指向下一个存储的消息,并

且分发该消息的Copy给它的Consumer. 因为在这种情况下,每个消息都有多个潜在消费者,所以消息只有当

被成功的传递给每个注册的Durabel Subscriber后,才会被删除。

?

1. KahaDB消息存储

??? 这是一种基于文件的消息存储,并且联合Journal事务,可稳定存储消息并且恢复消息。KahaDB消息存储使

用事务Log作为它的索引,并且对所有的Destination仅仅使用一个索引文件,该索引文件是事务日志文件组中

消息ID的索引。XML文件中配置KahaDB存储如

下:

<broker brokerName="broker" persistent="true" useShutdownHook="false">
	<persistenceAdapter>
		<kahaDB directory="activemq-data" journalMaxFileLength="16mb" />
	</persistenceAdapter>
</broker>

???? Java语言实现配置:

public class EmbeddedBrokerUsingAMQStoreExample {
	BrokerService createEmbeddedBroker() throws Exception {
		BrokerService broker = new BrokerService();
		File dataFileDir = new File("target/amq-in-action/kahadb");
		KahaDBStore kaha = new KahaDBStore();
		kaha.setDirectory(dataFileDir);
		// Using a bigger journal file
		kaha.setJournalMaxFileLength(1024 * 100);
		// small batch means more frequent and smaller writes
		kaha.setIndexWriteBatchSize(100);
		// do the index write in a separate thread
		kaha.setEnableIndexWriteAsync(true);
		broker.setPersistenceAdapter(kaha);
		// create a transport connector
		broker.addConnector("tcp://localhost:61616");
		// start the broker
		broker.start();
		return broker;
	}
} 

2. KahaDB消息存储原理

??? KahaDB消息存储联合使用快速的事务处理:Journal以及数据日志文件,该日志文件是消息ID的索引,并且

??? 在内存中缓存消息。

??? 参考:In KahaDB messages are stored in indexed log files and cached for performance.png

3. KahaDB消息存储目录结构

??? 包含以下文件及文件夹:db log files, archieve directory, db.data.db.redo.

4. 配置KahaDB消息存储

??? 参考:Manning ActiveMQ inAction.pdf(101-102)

?

?

????? AMQ消息存储,类似于KahaDB。它使用了journal事务来确保稳定的持久、恢复、高性能的索引。这种存

储方式主要用于大数据(消息)量的存储。对每个索引文件,有两个单独的文件;一个用于每个Destination。如

果每个Broker有上千个Queue,此时使用AMQ消息存储是不合适的,并且恢复数据也很慢,因为所有的索引文

件需要rebuid,需要Broker扫描所有的Data logs去再次构建索引。

?

1. AMQ消息存储

??? 与KahaDB类似,主要包括以下几部分:

??? (1) Data Logs: 作为消息journal.

??? (2) Cache: 在消息写入data log后,在内存中保持消息用于快速恢复。

??? (3) Reference Store:在journal保存消息的引用,按照消息ID索引。

2. AMQ消息目录结构

??? 包括:lock file,temp-storage directory, kr-store,journal directory, archive directory.

3. AMQ消息存储配置

<?xml version="1.0" encoding="UTF-8"?>
<beans>
	<broker xmlns="http://activemq.apache.org/schema/core">
		<persistenceAdapter>
			<amqPersistenceAdapter directory="target/Broker2-data/activemq-data"
				syncOnWrite="true" indexPageSize="16kb" indexMaxBinSize="100"
				maxFileLength="10mb" />
		</persistenceAdapter>
	</broker>
</beans>

???? 详细配置参考:Manning ActiveMQ inAction.pdf(105-106)