日期:2014-05-16 浏览次数:20765 次
三个文件
?
1 tpool.h
typedef struct tpool_work { void (*routine)(void *); void *arg; struct tpool_work *next; } tpool_work_t; typedef struct tpool { /* pool characteristics */ int num_threads; int max_queue_size; /* pool state */ pthread_t *tpid; tpool_work_t *queue; int front, rear; /* 剩下的任务可以做完, 但不能再加新的任务 */ int queue_closed; /* 剩下的任务都不做了, 直接关闭 */ int shutdown; /* pool synchronization */ pthread_mutex_t queue_lock; pthread_cond_t queue_has_task; pthread_cond_t queue_has_space; pthread_cond_t queue_empty; } *tpool_t; void tpool_init(tpool_t *tpoolp,int num_threads, int max_queue_size); int tpool_add_work(tpool_t tpool,void(*routine)(void *), void *arg); int tpool_destroy(tpool_t tpool,int finish);
?2 tpool.c
#include <stdlib.h> #include <stdio.h> #include <unistd.h> #include <sys/types.h> #include <string.h> #include <pthread.h> #include "tpool.h" #define DEBUG #if defined(DEBUG) #define debug(...) do { \ flockfile(stdout); \ printf("###%p.%s: ", (void *)pthread_self(), __func__); \ printf(__VA_ARGS__); \ putchar('\n'); \ fflush(stdout); \ funlockfile(stdout); \ } while (0) #else #define debug(...) #endif void *tpool_thread(void *); void tpool_init(tpool_t *tpoolp, int num_worker_threads, int max_queue_size) { int i; tpool_t pool; pool = (tpool_t)malloc(sizeof(struct tpool)); if (pool == NULL) { perror("malloc"); exit(0); } pool->num_threads = 0; pool->max_queue_size = max_queue_size + 1; pool->num_threads = num_worker_threads; pool->tpid = NULL; pool->front = 0; pool->rear = 0; pool->queue_closed = 0; pool->shutdown = 0; if (pthread_mutex_init(&pool->queue_lock, NULL) == -1) { perror("pthread_mutex_init"); free(pool); exit(0); } if (pthread_cond_init(&pool->queue_has_space, NULL) == -1) { perror("pthread_mutex_init"); free(pool); exit(0); } if (pthread_cond_init(&pool->queue_has_task, NULL) == -1) { perror("pthread_mutex_init"); free(pool); exit(0); } if (pthread_cond_init(&pool->queue_empty, NULL) == -1) { perror("pthread_mutex_init"); free(pool); exit(0); } if ((pool->queue = malloc(sizeof(struct tpool_work) * pool->max_queue_size)) == NULL) { perror("malloc"); free(pool); exit(0); } if ((pool->tpid = malloc(sizeof(pthread_t) * num_worker_threads)) == NULL) { perror("malloc"); free(pool); free(pool->queue); exit(0); } for (i = 0; i < num_worker_threads; i++) { if (pthread_create(&pool->tpid[i], NULL, tpool_thread, (void *)pool) != 0) { perror("pthread_create"); exit(0); } } *tpoolp = pool; } int empty(tpool_t pool) { return pool->front == pool->rear; } int full(tpool_t pool) { return ((pool->rear + 1) % pool->max_queue_size == pool->front); } int size(tpool_t pool) { return (pool->rear + pool->max_queue_size - pool->front) % pool->max_queue_size; } int tpool_add_work(tpool_t tpool, void(*routine)(void *), void *arg) { tpool_work_t *temp; pthread_mutex_lock(&tpool->queue_lock); while (full(tpool) && !tpool->shutdown && !tpool->queue_closed) { pthread_cond_wait(&tpool->queue_has_space, &tpool->queue_lock); } if (tpool->shutdown || tpool