日期:2014-05-16 浏览次数:21100 次
三个文件
?
1 tpool.h
typedef struct tpool_work {
void (*routine)(void *);
void *arg;
struct tpool_work *next;
} tpool_work_t;
typedef struct tpool {
/* pool characteristics */
int num_threads;
int max_queue_size;
/* pool state */
pthread_t *tpid;
tpool_work_t *queue;
int front, rear;
/* 剩下的任务可以做完, 但不能再加新的任务 */
int queue_closed;
/* 剩下的任务都不做了, 直接关闭 */
int shutdown;
/* pool synchronization */
pthread_mutex_t queue_lock;
pthread_cond_t queue_has_task;
pthread_cond_t queue_has_space;
pthread_cond_t queue_empty;
} *tpool_t;
void tpool_init(tpool_t *tpoolp,int num_threads, int max_queue_size);
int tpool_add_work(tpool_t tpool,void(*routine)(void *), void *arg);
int tpool_destroy(tpool_t tpool,int finish);
?2 tpool.c
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <string.h>
#include <pthread.h>
#include "tpool.h"
#define DEBUG
#if defined(DEBUG)
#define debug(...) do { \
flockfile(stdout); \
printf("###%p.%s: ", (void *)pthread_self(), __func__); \
printf(__VA_ARGS__); \
putchar('\n'); \
fflush(stdout); \
funlockfile(stdout); \
} while (0)
#else
#define debug(...)
#endif
void *tpool_thread(void *);
void tpool_init(tpool_t *tpoolp, int num_worker_threads, int max_queue_size)
{
int i;
tpool_t pool;
pool = (tpool_t)malloc(sizeof(struct tpool));
if (pool == NULL) {
perror("malloc");
exit(0);
}
pool->num_threads = 0;
pool->max_queue_size = max_queue_size + 1;
pool->num_threads = num_worker_threads;
pool->tpid = NULL;
pool->front = 0;
pool->rear = 0;
pool->queue_closed = 0;
pool->shutdown = 0;
if (pthread_mutex_init(&pool->queue_lock, NULL) == -1) {
perror("pthread_mutex_init");
free(pool);
exit(0);
}
if (pthread_cond_init(&pool->queue_has_space, NULL) == -1) {
perror("pthread_mutex_init");
free(pool);
exit(0);
}
if (pthread_cond_init(&pool->queue_has_task, NULL) == -1) {
perror("pthread_mutex_init");
free(pool);
exit(0);
}
if (pthread_cond_init(&pool->queue_empty, NULL) == -1) {
perror("pthread_mutex_init");
free(pool);
exit(0);
}
if ((pool->queue = malloc(sizeof(struct tpool_work) *
pool->max_queue_size)) == NULL) {
perror("malloc");
free(pool);
exit(0);
}
if ((pool->tpid = malloc(sizeof(pthread_t) * num_worker_threads)) == NULL) {
perror("malloc");
free(pool);
free(pool->queue);
exit(0);
}
for (i = 0; i < num_worker_threads; i++) {
if (pthread_create(&pool->tpid[i], NULL, tpool_thread,
(void *)pool) != 0) {
perror("pthread_create");
exit(0);
}
}
*tpoolp = pool;
}
int empty(tpool_t pool)
{
return pool->front == pool->rear;
}
int full(tpool_t pool)
{
return ((pool->rear + 1) % pool->max_queue_size == pool->front);
}
int size(tpool_t pool)
{
return (pool->rear + pool->max_queue_size -
pool->front) % pool->max_queue_size;
}
int tpool_add_work(tpool_t tpool, void(*routine)(void *), void *arg)
{
tpool_work_t *temp;
pthread_mutex_lock(&tpool->queue_lock);
while (full(tpool) && !tpool->shutdown && !tpool->queue_closed) {
pthread_cond_wait(&tpool->queue_has_space, &tpool->queue_lock);
}
if (tpool->shutdown || tpool