日期:2014-05-16 浏览次数:20588 次
一个仅仅部署在4台服务器上的服务,每秒向Database写入数据超过100万行数据,每分钟产生超过1G的数据。而每台服务器(8核12G)上CPU占用不到100%,load不超过5。这是怎么做到呢?下面将给你描述这个架构,它的核心是一个高效缓冲区设计,我们对它的要求是:
1,该缓存区要尽量简单
2,尽量避免生产者线程和消费者线程锁
3,尽量避免大量GC
提高硬盘写入IO的银弹无疑是批量顺序写,无论是在业界流行的分布式文件系统或数据,HBase,GFS和HDFS,还是以磁盘文件为持久化方式的消息队列Kafka都采用了在内存缓存数据然后再批量写入的策略。这一个策略的性能核心就是内存中缓冲区设计。这是一个经典的数据产生者和消费者场景,缓冲区的要求是当同步写入和读出时:(1)写满则不写(2)读空则不读(3)不丢失数据(4)不读重复数据。最直接也是常用的方式就是JDK自带的LinkedBlockingQueue。LinkedBlockingQueue是一个带锁的消息队列,写入和读出时加锁,完全满缓冲区上面的四个要求。但是当你的程序跑起来之后,看看那个线程CPU消耗最高?往往就是在线程读LinkedBlockingQueue锁的时候,这也成为很多对吞吐要求很高的程序的性能瓶颈。
解决加锁队列产生的性能问题?Disruptor是一个选择。Disruptor是什么?看看开源它的公司LMAX自己是怎么介绍的:
?
我们花费了大量的精力去实现更高性能的队列,但是,事实证明队列作为一种基础的数据结构带有它的局限性——在生产者、消费者、以及它们的数据存储之间的合并设计问题。Disruptor就是我们在构建这样一种能够清晰地分割这些关注问题的数据结构过程中所诞生的成果。
?
OK,Disruptor是用来解决我们这个场景的问题的,而且它不是队列。那么它是什么并且如何实现高效呢?我这里不做过多介绍,网上类似资料很多,简单的总结:
1,Disruptor使用了一个RingBuffer替代队列,用生产者消费者指针替代锁。
2,生产者消费者指针使用CPU支持的整数自增,无需加锁并且速度很快。Java的实现在Unsafe package中。
?
使用Disruptor,首先需要构建一个RingBuffer,并指定一个大小,注意如果RingBuffer里面数据超过了这个大小则会覆盖旧数据。这可能是一个风险,但Disruptor提供了检查RingBuffer是否写满的机制用于规避这个问题。而且根据maoyidao测试结果,写满的可能性不大,因为Disrutpor确实高效,除非你的消费线程太慢。
?
并且使用一个单独的线程去处理RingBuffer中的数据:
?
RingBuffer ringBuffer = new RingBuffer<ValueEvent>(ValueEvent.EVENT_FACTORY, new SingleThreadedClaimStrategy(RING_SIZE), new SleepingWaitStrategy()); SequenceBarrier barrier = ringBuffer.newBarrier(); BatchEventProcessor<ValueEvent> eventProcessor = new BatchEventProcessor<ValueEvent>(ringBuffer, barrier, handler); ringBuffer.setGatingSequences(eventProcessor.getSequence()); // only support single thread new Thread(eventProcessor).start();
?
ValueEvent通常是个自定义的类,用于封装你自己的数据:
?
public class ValueEvent { private byte[] packet; public byte[] getValue() { return packet; } public void setValue(final byte[] packet) { this.packet = packet; } public final static EventFactory<ValueEvent> EVENT_FACTORY = new EventFactory<ValueEvent>() { public ValueEvent newInstance() { return new ValueEvent(); } }; }
?
?
生产者通过RingBuffer.publish方法向buffer中添加数据,同时发出一个事件通知消费者有新数据达到,并且,,,注意我们是怎么规避数据覆盖问题的:
?
// Publishers claim events in sequence long sequence = ringBuffer.next(); // if capacity less than 10%, don't use ringbuffer anymore if(ringBuffer.remainingCapacity() < RING_SIZE * 0.1) { log.warn("disruptor:ringbuffer avaliable capacity is less than 10 %"); // do something } else { ValueEvent event = ringBuffer.get(sequence); event.setValue(packet); // this could be more complex with multiple fields // make the event available to EventProcessors ringBuffer.publish(sequence); }
?
数据消费者代码在EventHandler中实现:
?
final EventHandler<ValueEvent> handler = new EventHandler<ValueEvent>() { public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception { byte[] packet = event.getValue(); // do something } };
?
很好,完成!用以上代码跑个压测,结果果然比加锁队列快很多(Disruptor官网上有benchmark数据,我这里就不提供对比数据)。好,用到线上环境。。。。结果是。。。CPU反而飙升了!??
?
书接上文,Disruptor压测良好,但上线之后CPU使用达到650%,LOAD接近300!分析diruptor源码可知,造成cpu过高的原因是 RingBuffer 的waiting策略,Disruptor官网例子使用的策略是 SleepingWaitStrategy ,这个类的策略是当没有新数据写入RingBuffer时,每1ns检查一次RingBuffer cursor。1ns!跟死循环没什么区别,因此CPU暴高。改成每100ms检查一次,CPU立刻降为7.8%。
?
为什么Disruptor官网例子使用这种有如此风险的SleepingWaitStrategy呢?原因是此策略完全不使用锁,当吞吐极高时,RingBuffer中始终有数据存在,通过轮询策略就能最大程度的把它的性能优势发挥出来。但这显然是理想状态,互联网应用有明显的高峰低谷,不可能总处于满负荷状态。因此还是Bl