日期:2014-05-16  浏览次数:20481 次

BoneCP源码——BoneCP中使用的第三方包 jsr166y fork-join框架

?

BoneCP主要使用了下面几种第三方包:

1、Google Guava library?? The Guava project contains several of Google's core libraries that we rely on in our Java-based projects: collections, caching, primitives support, concurrency libraries, common annotations, string processing, I/O, and so forth.

2、SLF4J? 日志管理

3、jsr166y? Doug Lea写的一个并发包,代码量不多,已合并到JDK7的JUC包中。BoneCP中使用该包下的LinkedTransferQueue队列来保存Connection对象:

?

			if (config.getMaxConnectionsPerPartition() == config.getMinConnectionsPerPartition()){
				// if we have a pool that we don't want resized, make it even faster by ignoring
				// the size constraints.
				connectionHandles = queueLIFO ? new LIFOQueue<ConnectionHandle>() :  new LinkedTransferQueue<ConnectionHandle>();
			} else {
				connectionHandles = queueLIFO ? new LIFOQueue<ConnectionHandle>(this.config.getMaxConnectionsPerPartition()) : new BoundedLinkedTransferQueue<ConnectionHandle>(this.config.getMaxConnectionsPerPartition());
			}

?这段神奇的代码还没研究明白用意,当设置每个partition中的最大连接数和最小连接数相等时,保存Connection对象的队列就使用无参构造函数,也就是队列的最大值为Integer.MAX_VALUE,如果不相等则队列的最大值为用户设置的最大值。

BoneCP只使用到jsr166y中的3个类,合成到它自己的jar里:

?

?

什么是Fork-Join模式?

Fork-Join是把一个任务递归的分解成多个子任务,直到每个子问题都足够小,然后把这些问题放入队列中等待处理(fork步骤),接下来等待所有子问题的结果(join步骤),最后把多个结果合并到一起。

?

?更多的Fork-Join学习资料:

JDK 7 中的 Fork/Join 模式

Java 理论与实践: 应用 fork-join 框架

Java 理论与实践: 应用 fork-join 框架,第 2 部分

?

?例子:在一个文本里有N多个数据,使用多线程最快求和

这个是之前用JUC包实现的一个问题,使用Fork-Join模式很容易解决上JUC时的问题:

此算法的缺点有待改进的地方是结果汇总时是被动去检测,而不是某个结果计算完成后主动去汇总,既然是分段计算,如果数据量足够大时,应该采用递归去实现分段汇总会更好

?注:下面代码在JDK7下运行

/**
 * Huisou.com Inc.
 * Copyright (c) 2011-2012 All Rights Reserved.
 */

package thread;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

/**
 * @description
 * 
 * @author chenzehe
 * @email hljuczh@163.com
 * @create 2013-3-19 上午10:12:31
 */

public class CalculateWithForkJoin {
	public static void main(String[] args) {
		int[] numbers = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18 };
		NumbersStructure numbersStructure = new NumbersStructure(numbers, 0, numbers.length);
		int threshold = 5;
		int nThreads = 5;
		CalculateForkJoinTask calculateForkJoinTask = new CalculateForkJoinTask(numbersStructure, threshold);
		ForkJoinPool forkJoinPool = new ForkJoinPool(nThreads);
		forkJoinPool.invoke(calculateForkJoinTask);
		int sum = calculateForkJoinTask.sum;
		System.out.println(sum);
	}
}

class CalculateForkJoinTask extends RecursiveAction {
	private static final long		serialVersionUID	= -3958126944793236011L;
	private final int				threshold;
	private final NumbersStructure	numbersStructure;
	public int						sum;
	
	public CalculateForkJoinTask(NumbersStructure numbersStructure, int threshold) {
		this.numbersStructure = numbersStructure;
		this.threshold = threshold;
	}
	
	@Override
	protected void compute() {
		if (numbersStructure.size < threshold) {
			sum = numbersStructure.calculateSum();
		}
		else {
			int midpoint = numbersStructure.size / 2;
			CalculateForkJoinTask left = new CalculateForkJoinTask(numbersStructure.subproblem(0, midpoint), threshold);
			CalculateForkJoinTask right = new CalculateForkJoinTask(numbersStructure.subproblem(midpoint + 1, numbersStructure.size), threshold);
			invokeAll(left, right);
			sum = left.sum + right.sum;
		}
	}
	
}