基本功能

1. 实现一个线程的队列,队列中的线程启动后不再释放;

2. 没有任务执行时,线程处于pending状态,等待唤醒,不占cpu;

3. 当有任务需要执行时,从线程队列中取出一个线程执行任务;

4. 任务执行完成后线程再次进入pending状态,等待唤醒;

 

扩展功能

1. 线程的队列大小可设置;

2. 最大可创建的线程数可设置;

3. 根据运行需求,按需步进启动线程,避免大量线程一直处于pending状态,占用资源;

 

关键代码分析

数据结构

 1 /* 线程执行的任务参数 */
 2 typedef struct
 3 {
 4     void (*func)(void*, void*);    /* 任务函数指针 */
 5     void *arg1;                    /* 任务函数第一个参数 */
 6     void *arg2;                    /* 任务函数第二个参数 */
 7 }tThreadTaskInfo;
 8 
 9 /* 线程池参数 */
10 typedef struct
11 {
12     pthread_mutex_t lock;          /* 线程池互斥锁 */
13     pthread_cond_t cond;           /* 线程池同步信号 */
14 
15     pthread_t *threads;            /* 保存线程池创建的所有线程 */
16     int32_t threadMaxNum;          /* 最大可创建线程数 */
17     int32_t threadStartStep;       /* 一次启动线程的个数 */
18     int32_t threadStartCnt;        /* 已启动线程个数 */
19     int32_t threadPendCnt;         /* 已启动但是处于Pending状态的线程 */
20 
21     tThreadTaskInfo *taskQueue;    /* 等待执行的任务队列 */
22     int32_t taskQueueSize;         /* 任务队列的大小 */
23     int32_t taskQueueHead;         /* 当前任务队列头索引 */
24     int32_t taskQueueTail;         /* 当前任务队列尾索引 */
25     int32_t taskPendCnt;           /* 等待执行的任务个数 */
26 
27     int32_t isShutdown;            /* 线程池正在关闭 */
28 }tThreadpoolInfo;

 

创建线程池

  • 创建线程池时只分配了存储pthread_t的空间,但是不启动线程,后面根据需求步进启动;
 1 /************************************
 2  * 创建线程池
 3  *
 4  * @threadMaxNum     -- 最大可创建线程个数
 5  * @threadStartStep  -- 一次启动线程的个数
 6  * @taskQueueSize    -- 任务队列的大小
 7  *
 8  * @Retuen  --  成功:线程池的引用
 9  *              失败:NULL
10  * **********************************/
11 tThreadpoolInfo* threadpool_create(
12     int32_t threadMaxNum,
13     int32_t threadStartStep,
14     int32_t taskQueueSize)
15 {
16     tThreadpoolInfo *threadpool = NULL;
17 
18     if ((0 >= threadMaxNum)
19         || (0 >= threadStartStep)
20         || (0 >= taskQueueSize))
21     {
22         THREADPOOL_ERR("invalid param.\r\n");
23         goto error_exit;
24     }
25 
26     threadpool = (tThreadpoolInfo *)malloc(sizeof(tThreadpoolInfo));
27     if (NULL == threadpool)
28     {
29         THREADPOOL_ERR("malloc threadpool failed.\r\n");
30         goto error_exit;
31     }
32 
33     memset(threadpool, 0, sizeof(tThreadpoolInfo));
34     threadpool->threadMaxNum = threadMaxNum;
35     threadpool->threadStartStep = threadStartStep;
36     threadpool->taskQueueSize = taskQueueSize;
37 
38     /* 分配线程存储资源 */
39     threadpool->threads = (pthread_t *)calloc(threadMaxNum, sizeof(pthread_t));
40     if (NULL == threadpool->threads)
41     {
42         THREADPOOL_ERR("malloc threads failed.\r\n");
43         goto error_exit;
44     }
45 
46     /* 分配任务队列 */
47     threadpool->taskQueue = (tThreadTaskInfo *)calloc(taskQueueSize, sizeof(tThreadTaskInfo));
48     if (NULL == threadpool->taskQueue)
49     {
50         THREADPOOL_ERR("malloc task queue failed.\r\n");
51         goto error_exit;
52     }
53 
54     /* 初始化互斥信号量和同步信号 */
55     if (0 != THREADPOOL_LOCK_INIT(threadpool))
56     {
57         THREADPOOL_ERR("mutex init failed.\r\n");
58         goto error_exit;
59     }
60 
61     if (0 != THREADPOOL_COND_INIT(threadpool))
62     {
63         THREADPOOL_ERR("cond init failed.\r\n");
64         goto error_exit;
65     }
66 
67     return threadpool;
68 
69 error_exit:
70 
71     if (threadpool != NULL)
72     {
73         threadpool_free(threadpool);
74     }
75 
76     return NULL;
77 }

 

向线程池添加任务

  • 查看等待队列是否有空闲,如果没有空闲则返回错误;
  • 查看当前有没有处于pending的线程,如果没有则按照步进启动新的线程,如果已达到最大线程数则返回错误;
  • 将任务添加到队列中,并唤醒一个线程执行任务;
 1 /************************************
 2  * 向线程池添加任务
 3  *
 4  * @threadpool -- 线程池引用
 5  * @taskfunc   -- 任务回调函数
 6  * @arg1       -- 任务第一个参数
 7  * @arg1       -- 任务第二个参数
 8  *
 9  * @Return  --  成功: 0
10  *              失败: -1
11  * **********************************/
12 int32_t threadpool_addtask(
13     tThreadpoolInfo *threadpool,
14     THREADPOOLTASKFUNC taskfunc,
15     void *arg1,
16     void *arg2)
17 {
18     int32_t ret = 0;
19 
20     if ((NULL == threadpool) || (NULL == taskfunc))
21     {
22         THREADPOOL_ERR("invalid param.\r\n");
23         return -1;
24     }
25 
26     THREADPOOL_LOCK(threadpool);
27 
28     do
29     {
30         if (threadpool->isShutdown)
31         {
32             THREADPOOL_ERR("threadpool is shutdown.\r\n");
33             ret = -1;
34             break;
35         }
36 
37         /* 判断等待执行的任务队列是否满 */
38         if (threadpool->taskPendCnt == threadpool->taskQueueSize)
39         {
40             THREADPOOL_ERR("task queue is full.\r\n");
41             ret = -1;
42             break;
43         }
44 
45         /* 如果pending状态的线程已用完,则启动新的线程 */
46         if (threadpool->threadPendCnt <= 0)
47         {
48             if (0 != threadpool_start(threadpool))
49             {
50                 ret = -1;
51                 break;
52             }
53         }
54 
55         /* 将任务放入对尾 */
56         threadpool->taskQueue[threadpool->taskQueueTail].func = taskfunc;
57         threadpool->taskQueue[threadpool->taskQueueTail].arg1 = arg1;
58         threadpool->taskQueue[threadpool->taskQueueTail].arg2 = arg2;
59 
60         threadpool->taskQueueTail = (threadpool->taskQueueTail + 1) % threadpool->taskQueueSize;
61         threadpool->taskPendCnt++;
62 
63         /* 唤醒一个线程执行任务 */
64         THREADPOOL_COND_SIGNAL(threadpool);
65 
66     } while(0);
67 
68     THREADPOOL_UNLOCK(threadpool);
69     return ret;
70 }

 

线程的回调函数

  • 线程第一次启动和被唤醒后检查队列中是否有需要执行的任务,如果没有则继续等待唤醒;
  • 如果有需要执行的任务,则从队列中取一个任务并执行;
  • 如果线程池已销毁,则退出线程;
 1 /************************************
 2  * 线程回调函数
 3  * 等待线程池分配任务并执行分配的任务
 4  *
 5  * @arg  -- 线程池引用
 6  * **********************************/
 7 void* thread_callback(void *arg)
 8 {
 9     tThreadpoolInfo *threadpool = (tThreadpoolInfo *)arg;
10     tThreadTaskInfo task;
11 
12     while (1)
13     {
14         THREADPOOL_LOCK(threadpool);
15 
16         /* 等待任务分配的信号 
17          * 如果当前没有等待执行的任务,并且线程池没有关闭则继续等待信号 */
18         while ((0 == threadpool->taskPendCnt)
19                 && (0 == threadpool->isShutdown))
20         {
21             THREADPOOL_COND_WAIT(threadpool);
22         }
23 
24         /* 如果线程池已关闭,则退出线程  */
25         if (threadpool->isShutdown)
26             break;
27 
28         /* 取任务队列中当前第一个任务 */
29         task.func = threadpool->taskQueue[threadpool->taskQueueHead].func;
30         task.arg1 = threadpool->taskQueue[threadpool->taskQueueHead].arg1;
31         task.arg2 = threadpool->taskQueue[threadpool->taskQueueHead].arg2;
32 
33         threadpool->taskQueueHead = (threadpool->taskQueueHead + 1) % threadpool->taskQueueSize;
34         threadpool->taskPendCnt--;
35         threadpool->threadPendCnt--;
36 
37         THREADPOOL_UNLOCK(threadpool);
38 
39         /* 执行任务 */
40         (*(task.func))(task.arg1, task.arg2);
41 
42         /* 任务执行完成后,线程进入pending状态 */
43         THREADPOOL_LOCK(threadpool);
44         threadpool->threadPendCnt++;
45         THREADPOOL_UNLOCK(threadpool);
46     }
47 
48     threadpool->threadStartCnt--;
49     THREADPOOL_UNLOCK(threadpool);
50 
51     pthread_exit(NULL);
52 }

 

线程池销毁

  • 销毁为确保资源释放,需要唤醒所有线程,并等待所有线程退出;
 1 /************************************
 2  * 删除线程池
 3  *
 4  * @threadpool  -- 线程池引用
 5  * **********************************/
 6 int32_t threadpool_destroy(tThreadpoolInfo *threadpool)
 7 {
 8     int32_t ret = 0;
 9     int32_t i = 0;
10 
11     if (NULL == threadpool)
12     {
13         THREADPOOL_ERR("invalid param.\r\n");
14         return -1;
15     }
16 
17     THREADPOOL_LOCK(threadpool);
18 
19     do
20     {
21         if (threadpool->isShutdown)
22         {
23             THREADPOOL_UNLOCK(threadpool);
24             break;
25         }
26 
27         threadpool->isShutdown = 1;
28 
29         /* 唤醒所有线程 */
30         if (0 != THREADPOOL_COND_BROADCAST(threadpool))
31         {
32             THREADPOOL_ERR("cond broadcast failed.\r\n");
33             threadpool->isShutdown = 0;
34             continue;
35         }
36 
37         THREADPOOL_UNLOCK(threadpool);
38 
39         /* 等待所有进程退出 */
40         for (i = 0; i < threadpool->threadStartCnt; i++)
41         {
42             pthread_cancel(threadpool->threads[i]);
43             pthread_join(threadpool->threads[i], NULL);
44         }
45 
46     }while(0);
47 
48     if (0 != ret)
49     {
50         threadpool->isShutdown = 0;
51         return ret;
52     }
53 
54     threadpool_free(threadpool);
55     return ret;
56 }

 

线程池测试

  • 创建最大线程数=256,队列大小=64,启动步进=8 的线程池;
  • 向线程池添加1024个任务,如果添加失败则等待1秒再添加;
  • 验证1024个任务是否均能执行;
 1 /***********************************
 2  * Filename : test_main.c
 3  * Author :   taopeng
 4  * *********************************/
 5 
 6 #include <stdio.h>
 7 #include <stdlib.h>
 8 #include <string.h>
 9 #include <unistd.h>
10 
11 #include "threadpool.h"
12 
13 void test_task(void *arg)
14 {
15     long id = (long)arg;
16 
17     printf("task[%ld] enter\r\n", id);
18     sleep(3);
19 
20     return;
21 }
22 
23 int32_t main(int32_t argc, char *argv[])
24 {
25     tThreadpoolInfo *threadpool;
26     long id;
27 
28     threadpool = threadpool_create(128, 8, 64);
29     if (NULL == threadpool)
30         return -1;
31 
32     for (id = 1; id <= 1024;)
33     {
34         if (0 != threadpool_addtask(threadpool, (THREADPOOLTASKFUNC)test_task, (void *)id, NULL))
35         {
36             sleep(1);
37             continue;
38         }
39 
40         id++;
41     }
42 
43     sleep(30);
44 
45     threadpool_destroy(threadpool);
46     return 0;
47 }

 

代码实例链接

https://gitee.com/github-18274965/threadpool.git

 

版权声明:本文为tp1226原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/tp1226/p/13996788.html