日期:2014-05-20  浏览次数:20760 次

java多线程生产者-消费者模式问题请教
现在公司项目中碰到这样一个需求,需要设计一个类似生产者-消费者的模块,负责数据的分发。
1、入口只有一个:可能是一条一条写入,也可能是一批一批写入。目前设计了一个List暂存这些数据。
2、数据量很大,所以目前我们限制了List的大小,当达到一定容量时,会把当前的暂存List临时存到一个消费队列中,重新new一个新的暂存list继续接受数据。
3、消费者有多个,并且每个消费者都需要消费队列中List中的所有数据。每个消费者是一个线程。
4、当多个消费者线程都消费掉队列中的一个暂存list后,该暂存的list会从队列中清理。

比我们平常讨论的生产者-消费者程序复杂很多。想了很久,没想出思路,请教各位大牛,在线急等。

------解决方案--------------------
LinkedBlockingQueue看下这个类的API
------解决方案--------------------
数据量多大,超过十万级别么
------解决方案--------------------
引用:
LinkedBlockingQueue看下这个类的API


+1
------解决方案--------------------
同1楼,也觉得BlockingQueue可以

楼主,如果是考虑到内存问题才使用暂存List的话,不是等于没解决问题么?
------解决方案--------------------
我也有类似的场景,我准备使用BlockingQueue
------解决方案--------------------
楼主的有一点描述看不懂,为什么要“当达到一定容量时,会把当前的暂存List临时存到一个消费队列中,重新new一个新的暂存list继续接受数据。”?
数据你是可以放到多个List中,但是都是在使用内存。使用一个还是多个List,都耗费几乎同样多的内存。
也就是说你这样设计数据Keeper是画蛇添足,完全没有必要的。除非你把多的数据放到文件或者数据库中,这样设计才有意义。
你可能是担心ArrayList每次删掉前面的数据时,后面的数据会搬动。但是实际上这样老是需要在前面删除数据,在后面加入数据的应用需求,你本来就不能用ArrayList而应该用LinkedList。
------解决方案--------------------
如果你对以上我说的没有异议,那么我可以给你出一个方案:
1、使用BlockingQueue作为数据容器。
2、数据中增加一个属性,用于保存使用过该数据的消费者
3、消费者每次都从BlockingQueue取数据,当发现该数据已经使用过了,就等待其他消费者使用完该数据
4、当最后一个消费者消费完该数据时,把这个数据从BlockingQueue中remove
------解决方案--------------------
貌似楼主的资源一个只能被消费一次,如有资源A,B,C,如果消费者I消费了A,那就只剩下B,C,消费者II只能去消费B,C了,是这样的吗?

引用:
如果你对以上我说的没有异议,那么我可以给你出一个方案:
1、使用BlockingQueue作为数据容器。
2、数据中增加一个属性,用于保存使用过该数据的消费者
3、消费者每次都从BlockingQueue取数据,当发现该数据已经使用过了,就等待其他消费者使用完该数据
4、当最后一个消费者消费完该数据时,把这个数据从BlockingQueue中remove

------解决方案--------------------
我好像看错了。。。

引用:
貌似楼主的资源一个只能被消费一次,如有资源A,B,C,如果消费者I消费了A,那就只剩下B,C,消费者II只能去消费B,C了,是这样的吗?


引用:
如果你对以上我说的没有异议,那么我可以给你出一个方案:
1、使用BlockingQueue作为数据容器。
2、数据中增加一个属性,用于保存使用过该数据的消费者
3、消费者每次都从BlockingQueue取数据,当发现该……

------解决方案--------------------
我觉得楼主应该说清楚需求越详细越好。
你描述是自己的解决方案,俺们都不是很清楚的需求是什么。
一知半解
------解决方案--------------------
写了个例子,看看合适不合适:

package csdn;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * data
 */
class Data{
public Data(String message){
this.message=message;
}

private String message;

public String getMessage() {
return message;
}

public void setMessage(String message) {
this.message = message;
}
}
/**
 * Data Keeper
 */
class DataKeeper<T>{
//最大队列长度
public static final int MAXQUEUELENGTH=100;
//消费者个数
public static final int MAXCONSUMER=5;