日期:2014-05-16  浏览次数:20765 次

Linux C线程池实现

三个文件

?

1 tpool.h

typedef struct tpool_work {
    void               (*routine)(void *);
    void                *arg;
    struct tpool_work   *next;
} tpool_work_t;

typedef struct tpool {
    /* pool characteristics */
    int                 num_threads;
    int                 max_queue_size;
    /* pool state */
    pthread_t           *tpid;
    tpool_work_t        *queue;
    int                 front, rear;
    /* 剩下的任务可以做完, 但不能再加新的任务 */
    int                 queue_closed;   
    /* 剩下的任务都不做了, 直接关闭 */
    int                 shutdown;       
    /* pool synchronization */
    pthread_mutex_t     queue_lock;
    pthread_cond_t      queue_has_task;
    pthread_cond_t      queue_has_space;
    pthread_cond_t      queue_empty;
} *tpool_t;

void tpool_init(tpool_t *tpoolp,int num_threads, int max_queue_size);

int tpool_add_work(tpool_t tpool,void(*routine)(void *), void *arg);

int tpool_destroy(tpool_t tpool,int finish);

?2 tpool.c

#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <string.h>
#include <pthread.h>
#include "tpool.h"

#define DEBUG

#if defined(DEBUG)
#define debug(...) do { \
    flockfile(stdout); \
    printf("###%p.%s: ", (void *)pthread_self(), __func__); \
    printf(__VA_ARGS__); \
    putchar('\n'); \
    fflush(stdout); \
    funlockfile(stdout); \
} while (0)
#else
#define debug(...)
#endif

void *tpool_thread(void *);

void tpool_init(tpool_t *tpoolp, int num_worker_threads, int max_queue_size)
{
    int i;
    tpool_t pool;

    pool = (tpool_t)malloc(sizeof(struct tpool));
    if (pool == NULL) {
        perror("malloc");
        exit(0);
    }

    pool->num_threads = 0;
    pool->max_queue_size = max_queue_size + 1;
    pool->num_threads = num_worker_threads;
    pool->tpid = NULL;
    pool->front = 0;
    pool->rear = 0;
    pool->queue_closed = 0;
    pool->shutdown = 0;

    if (pthread_mutex_init(&pool->queue_lock, NULL) == -1) {
        perror("pthread_mutex_init");
        free(pool);
        exit(0);
    }
    if (pthread_cond_init(&pool->queue_has_space, NULL) == -1) {
        perror("pthread_mutex_init");
        free(pool);
        exit(0);
    }
    if (pthread_cond_init(&pool->queue_has_task, NULL) == -1) {
        perror("pthread_mutex_init");
        free(pool);
        exit(0);
    }
    if (pthread_cond_init(&pool->queue_empty, NULL) == -1) {
        perror("pthread_mutex_init");
        free(pool);
        exit(0);
    }

    if ((pool->queue = malloc(sizeof(struct tpool_work) * 
                    pool->max_queue_size)) == NULL) {
        perror("malloc");
        free(pool);
        exit(0);
    }

    if ((pool->tpid = malloc(sizeof(pthread_t) * num_worker_threads)) == NULL) {
        perror("malloc");
        free(pool);
        free(pool->queue);
        exit(0);
    }

    for (i = 0; i < num_worker_threads; i++) {
        if (pthread_create(&pool->tpid[i], NULL, tpool_thread, 
                    (void *)pool) != 0) {
            perror("pthread_create");
            exit(0);
        }
    }

    *tpoolp = pool;
}


int empty(tpool_t pool)
{
    return  pool->front == pool->rear;
}

int full(tpool_t pool)
{
    return ((pool->rear + 1) % pool->max_queue_size == pool->front);
}

int size(tpool_t pool)
{
    return (pool->rear + pool->max_queue_size -
                pool->front) % pool->max_queue_size;
}

int tpool_add_work(tpool_t tpool, void(*routine)(void *), void *arg)
{
    tpool_work_t *temp;

    pthread_mutex_lock(&tpool->queue_lock);

    while (full(tpool) && !tpool->shutdown && !tpool->queue_closed) {
        pthread_cond_wait(&tpool->queue_has_space, &tpool->queue_lock);
    }

    if (tpool->shutdown || tpool