
- 前言
- 一、实验目的
- 二、实验工具与设备
- 三、实验预备知识
- 四、实验内容和步骤
- 五、实验代码及步骤截图
为了帮助同学们完成痛苦的实验课程设计,本作者将其作出的实验结果及代码贴至CSDN中,供同学们学习参考。如有不足或描述不完善之处,敬请各位指出,欢迎各位的斧正!
一、实验目的- 理解Linux的线程原理。
- 掌握和使用线程函数实现线程的 *** 作。
- 掌握和使用互斥锁和信号量实现线程同步。
- 掌握线程池。
装有Linux系统的计算机
三、实验预备知识线程可以提高应用程序在多核环境下处理诸如文件I/O或者socket I/O等会产生堵塞的情况的表现性能。在Unix系统中,一个进程包含很多东西,包括可执行程序以及一大堆的诸如文件描述符地址空间等资源。在很多情况下,完成相关任务的不同代码间需要交换数据。如果采用多进程的方式,那么通信就需要在用户空间和内核空间进行频繁的切换,开销很大。但是如果使用多线程的方式,因为可以使用共享的全局变量,所以线程间的通信(数据交换)变得非常高效。
头文件:#include
需要的编译条件:-pthread
线程结构体pthread_attr_t中记录线程的信息。
typedef struct
{
int detachstate; 线程的分离状态
int schedpolicy; 线程调度策略
structsched_param schedparam; 线程的调度参数
int inheritsched; 线程的继承性
int scope; 线程的作用域
size_t guardsize; 线程栈末尾的警戒缓冲区大小
int stackaddr_set;
void* stackaddr; 线程栈的位置
size_t stacksize; 线程栈的大小
}pthread_attr_t;
线程相关的接口函数:
对线程属性初始化/去除初始化
int pthread_attr_init(pthread_attr_t *attr); int pthread_attr_destroy(pthread_attr_t *attr);
创建线程
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
向线程发送终止信号
int pthread_cancel(pthread_t thread);
使线程进入分离态
int pthread_detach(pthread_t thread);
比较两个线程的标识符
int pthread_equal(pthread_t t1, pthread_t t2);
使线程退出
void pthread_exit(void *retval);
通过结构体初始化,用于明确指定线程属性
int pthread_getattr_np(pthread_t thread, pthread_attr_t *attr);
等待指定线程的结束
int pthread_join(pthread_t thread, void **retval);
返回线程当前的描述符
pthread_t pthread_self(void);四、实验内容和步骤
- 创建线程。
#include#include void *mythread1(void) { int i; for(i = 0; i < 10; i++) { printf("This is the pthread1,i=%dn",i); sleep(1); } } void *mythread2(void) { int i; for(i = 0; i < 10; i++) { printf("This is the pthread1,i=%dn",i ); sleep(1); } } int main(int argc, const char *argv[]) { int i = 0; int ret = 0; pthread_t id1,id2; ret = pthread_create(&id1, NULL, (void *)mythread1,NULL); if(ret) { printf("Create pthread error!n"); return 1; } ret = pthread_create(&id2, NULL, (void *)mythread2,NULL); if(ret) { printf("Create pthread error!n"); return 1; } pthread_join(id1,NULL); pthread_join(id2,NULL); return 0; }
执行以下命令编译程序并执行:gcc -o Createthread Createthread.c -lpthread
- 创建两个线程实现对一个数的递加。
#include#include #include #include #define MAX 20 pthread_t thread[2]; //两个线程 pthread_mutex_t mut; int number=0; int i; void *thread1() { printf ("thread1 : this is the thread1n"); for (i = 0; i < MAX; i++) //模拟线程执行时间 { printf("thread1 : number = %dn",number); pthread_mutex_lock(&mut); number++; pthread_mutex_unlock(&mut); sleep(2); } printf("thread1 :主函数在等我完成任务吗?n"); pthread_exit(NULL); } void *thread2() { printf(""thread2 : this is the thread2n"); for (i = 0; i < MAX; i++) { printf("thread2 : number = %dn",number); pthread_mutex_lock(&mut); number++; pthread_mutex_unlock(&mut); sleep(3); } printf("thread2 :主函数在等我完成任务吗?n"); pthread_exit(NULL); } void thread_create(void) //创建两个线程 { int temp; memset(&thread, 0, sizeof(thread)); //comment1 if((temp = pthread_create(&thread[0], NULL, thread1, NULL)) != 0) //comment2 printf("线程1创建失败n"); else printf("线程1被创建n"); if((temp = pthread_create(&thread[1], NULL, thread2, NULL)) != 0) //comment3 printf("线程2创建失败n"); else printf("线程2被创建n"); } void thread_wait(void) { if(thread[0] !=0) { //comment4 pthread_join(thread[0],NULL); printf("线程1已经结束n"); } if(thread[1] !=0) { //comment5 pthread_join(thread[1],NULL); printf("线程2已经结束n"); } } int main() { pthread_mutex_init(&mut,NULL); printf("创建线程n"); thread_create(); printf("等待线程完成任务n"); thread_wait(); return 0; }
通过以下命令编译:gcc -lpthread -o thread_example thread_example.c
- 线程同步
未同步示例程序:
#include#include #include #include #include #define LEN 100000 int num = 0; void* thread_func(void* arg) { for (int i = 0; i< LEN; ++i) { num += 1; } return NULL; } int main() { pthread_t tid1, tid2; pthread_create(&tid1, NULL, (void*)thread_func, NULL); pthread_create(&tid2, NULL, (void*)thread_func, NULL); char* rev = NULL; pthread_join(tid1, (void *)&rev); pthread_join(tid2, (void *)&rev); printf("correct result=%d, wrong result=%d.n", 2*LEN, num); return 0; }
运行结果:correct result=200000, wrong result=106860.
通过互斥解决同步问题的程序:
#include#include #include #include #include #define LEN 100000 int num = 0; void* thread_func(void* arg) { pthread_mutex_t* p_mutex = (pthread_mutex_t*)arg; for (int i = 0; i< LEN; ++i) { pthread_mutex_lock(p_mutex); num += 1; pthread_mutex_unlock(p_mutex); } return NULL; } int main() { pthread_mutex_t m_mutex; pthread_mutex_init(&m_mutex, NULL); pthread_t tid1, tid2; pthread_create(&tid1, NULL, (void*)thread_func, (void*)&m_mutex); pthread_create(&tid2, NULL, (void*)thread_func, (void*)&m_mutex); pthread_join(tid1, NULL); pthread_join(tid2, NULL); pthread_mutex_destroy(&m_mutex); printf("correct result=%d, result=%d.n", 2*LEN, num); return 0; }
运行结果:correct result=200000, result=200000.
- 编写信号量使用程序
#include#include #include #include #include #include #include #include #define NUM 5 int queue[NUM]; sem_t psem, csem; void producer(void* arg) { int pos = 0; int num, count = 0; for (int i=0; i<12; ++i) { num = rand() % 100; count += num; sem_wait(&psem); queue[pos] = num; sem_post(&csem); printf("producer: %dn", num); pos = (pos+1) % NUM; sleep(rand()%2); } printf("producer count=%dn", count); } void consumer(void* arg){ int pos = 0; int num, count = 0; for (int i=0; i<12; ++i) { sem_wait(&csem); num = queue[pos]; sem_post(&psem); printf("consumer: %dn", num); count += num; pos = (pos+1) % NUM; sleep(rand()%3); } printf("consumer count=%dn", count); } int main() { sem_init(&psem, 0, NUM); sem_init(&csem, 0, 0); pthread_t tid[2]; pthread_create(&tid[0], NULL, (void*)producer, NULL); pthread_create(&tid[1], NULL, (void*)consumer, NULL); pthread_join(tid[0], NULL); pthread_join(tid[1], NULL); sem_destroy(&psem); sem_destroy(&csem); return 0; }
- 编写线程池程序
1.理解并运行以下线程实现
#include#include #include #include #include #include #include typedef struct condition { pthread_mutex_t pmutex; pthread_cond_t pcond; }condition_t; typedef struct task { void *(*run)(void *arg); void *arg; struct task *next; }task_t; typedef struct threadpool { condition_t ready; task_t *first; task_t *last; int counter; int idle; int max_threads; int quit; }threadpool_t; int condition_init(condition_t *cond) { int status; if((status = pthread_mutex_init(&cond->pmutex,NULL)))//返回0代表初始化成功 return status; if((status = pthread_cond_init(&cond->pcond,NULL))) return status; return 0; } int condition_lock(condition_t *cond) { return pthread_mutex_lock(&cond -> pmutex); } int condition_unlock(condition_t *cond) { return pthread_mutex_unlock(&cond -> pmutex); } int condition_wait(condition_t *cond) { return pthread_cond_wait(&cond -> pcond,&cond -> pmutex); } int condition_timewait(condition_t *cond,const struct timespec *abstime) { return pthread_cond_timedwait(&cond->pcond,&cond->pmutex,abstime); } int condition_signal(condition_t *cond) { return pthread_cond_signal(&cond->pcond); } int condition_broadcast(condition_t *cond) { return pthread_cond_broadcast(&cond -> pcond); } int condition_destory(condition_t *cond) { int status; if((status = pthread_mutex_destroy(&cond -> pmutex))) return status; if((status = pthread_cond_destroy(&cond -> pcond))) return status; return 0; } void *thread_routine(void *arg) { struct timespec abstime; int timeout; printf("thread 0x%0x is startingn",(int)pthread_self()); threadpool_t *pool = (threadpool_t *)arg; while(1) { timeout = 0; condition_lock(&pool -> ready); pool -> idle++; //等待队列有任务到来或者线程池销毁的通知 while(pool -> first == NULL && !pool -> quit) { printf("thread 0x%0x is waitingn",(int)pthread_self()); clock_gettime(CLOCK_REALTIME,&abstime); abstime.tv_sec += 2; int status=condition_timewait(&pool -> ready,&abstime); if(status == ETIMEDOUT) { printf("thread 0x%0x is wait timed outn",(int)pthread_self()); timeout = 1; break; } } //等到到条件,处于工作状态 pool -> idle--; if(pool -> first != NULL) { task_t *t = pool -> first; pool -> first = t -> next; //需要先解锁,以便添加新任务。其他消费者线程能够进入等待任务。 condition_unlock(&pool -> ready); t -> run(t->arg); free(t); condition_lock(&pool -> ready); } //等待线程池销毁的通知 if(pool -> quit && pool ->first == NULL) { pool -> counter--; if(pool->counter == 0) { condition_signal(&pool -> ready); } condition_unlock(&pool->ready); //跳出循环之前要记得解锁 break; } if(timeout &&pool -> first ==NULL) { pool -> counter--; condition_unlock(&pool->ready); //跳出循环之前要记得解锁 break; } condition_unlock(&pool -> ready); } printf("thread 0x%0x is exitingn",(int)pthread_self()); return NULL; } //初始化 void threadpool_init(threadpool_t *pool, int threads) { condition_init(&pool -> ready); pool -> first = NULL; pool -> last = NULL; pool -> counter = 0; pool -> idle = 0; pool -> max_threads = threads; pool -> quit = 0; } //加任务 void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg),void *arg) { task_t *newstask = (task_t *)malloc(sizeof(task_t)); newstask->run = run; newstask->arg = arg; newstask -> next = NULL; condition_lock(&pool -> ready); //将任务添加到对列中 if(pool -> first ==NULL) { pool -> first = newstask; } else pool -> last -> next = newstask; pool -> last = newstask; //如果有等待线程,则唤醒其中一个 if(pool -> idle > 0) { condition_signal(&pool -> ready); } else if(pool -> counter < pool -> max_threads) { pthread_t tid; pthread_create(&tid,NULL,thread_routine,pool); pool -> counter++; } condition_unlock(&pool -> ready); } //销毁线程池 void threadpool_destory(threadpool_t *pool) { if(pool -> quit) { return; } condition_lock(&pool -> ready); pool->quit = 1; if(pool -> counter > 0) { if(pool -> idle > 0) condition_broadcast(&pool->ready); while(pool -> counter > 0) { condition_wait(&pool->ready); } } condition_unlock(&pool->ready); condition_destory(&pool -> ready); } void *mytask(void *arg) { printf("thread 0x%0x is working on task %dn",(int)pthread_self(),*(int*)arg); sleep(1); free(arg); return NULL; } int main() { threadpool_t pool; threadpool_init(&pool,3); int i ; for(i = 0; i < 10; i++) { int *arg = (int *)malloc(sizeof(int)); *arg = i; threadpool_add_task(&pool,mytask,arg); } sleep(15); threadpool_destory(&pool); return 0; }
编译运行:gcc Threadpool.c -o Threadpool -lpthread -lrt
2.参考https://www.cnblogs.com/jiangzhaowei/p/10383049.html编写复杂的线程池;
3.改造属于自己的线程池代码。
3-1.c:
#include#include void *mythread1(void) { int i; for(i = 0; i < 10; i++) { printf("This is the pthread1,i=%dn",i); sleep(1); } } void *mythread2(void) { int i; for(i = 0; i < 10; i++) { printf("This is the pthread2,i=%dn",i ); sleep(1); } } int main(int argc, const char *argv[]) { int i = 0; int ret = 0; pthread_t id1,id2; ret = pthread_create(&id1, NULL, (void *)mythread1,NULL); if(ret) { printf("Create pthread error!n"); return 1; } ret = pthread_create(&id2, NULL, (void *)mythread2,NULL); if(ret) { printf("Create pthread error!n"); return 1; } pthread_join(id1,NULL); pthread_join(id2,NULL); return 0; }
实验3-1运行截图如下:
3-2.c:
#include#include #include #include #define MAX 20 pthread_t thread[2]; //两个线程 pthread_mutex_t mut; int number=0; int i; void *thread1() { printf ("thread1 : this is the thread1n"); for (i = 0; i < MAX; i++) //模拟线程执行时间 { printf("thread1 : number = %dn",number); pthread_mutex_lock(&mut); number++; pthread_mutex_unlock(&mut); sleep(2); } printf("thread1 :主函数在等我完成任务吗?n"); pthread_exit(NULL); } void *thread2() { printf("thread2 : this is the thread2n"); for (i = 0; i < MAX; i++) { printf("thread2 : number = %dn",number); pthread_mutex_lock(&mut); number++; pthread_mutex_unlock(&mut); sleep(3); } printf("thread2 :主函数在等我完成任务吗?n"); pthread_exit(NULL); } void thread_create(void) //创建两个线程 { int temp; memset(&thread, 0, sizeof(thread)); //comment1 if((temp = pthread_create(&thread[0], NULL, thread1, NULL)) != 0) //comment2 printf("线程1创建失败n"); else printf("线程1被创建n"); if((temp = pthread_create(&thread[1], NULL, thread2, NULL)) != 0) //comment3 printf("线程2创建失败n"); else printf("线程2被创建n"); } void thread_wait(void) { if(thread[0] !=0) { //comment4 pthread_join(thread[0],NULL); printf("线程1已经结束n"); } if(thread[1] !=0) { //comment5 pthread_join(thread[1],NULL); printf("线程2已经结束n"); } } int main() { pthread_mutex_init(&mut,NULL); printf("创建线程n"); thread_create(); printf("等待线程完成任务n"); thread_wait(); return 0; }
实验3-2运行截图如下:
3-3.c:
#include#include #include #include #include #define LEN 100000 int num = 0; void* thread_func(void* arg) { int i; for (i=0; i 3-3.c运行截图如下:
3-3-1.c:
#include#include #include #include #include #define LEN 100000 int num = 0; void* thread_func(void* arg) { pthread_mutex_t* p_mutex = (pthread_mutex_t*)arg; int i; for (i = 0; i< LEN; ++i) { pthread_mutex_lock(p_mutex); num += 1; pthread_mutex_unlock(p_mutex); } return NULL; } int main() { pthread_mutex_t m_mutex; pthread_mutex_init(&m_mutex, NULL); pthread_t tid1, tid2; pthread_create(&tid1, NULL, (void*)thread_func, (void*)&m_mutex); pthread_create(&tid2, NULL, (void*)thread_func, (void*)&m_mutex); pthread_join(tid1, NULL); pthread_join(tid2, NULL); pthread_mutex_destroy(&m_mutex); printf("correct result=%d, result=%d.n", 2*LEN, num); return 0; } 3-3-1.c运行截图如下:
3-4.c:
#include#include #include #include #include #include #include #include #define NUM 5 int queue[NUM]; sem_t psem, csem; void producer(void* arg) { int pos = 0; int num, count = 0; int i; for (i=0; i<12; ++i) { num = rand() % 100; count += num; sem_wait(&psem); queue[pos] = num; sem_post(&csem); printf("producer: %dn", num); pos = (pos+1) % NUM; sleep(rand()%2); } printf("producer count=%dn", count); } void consumer(void* arg){ int pos = 0; int num, count = 0; int i; for (i=0; i<12; ++i) { sem_wait(&csem); num = queue[pos]; sem_post(&psem); printf("consumer: %dn", num); count += num; pos = (pos+1) % NUM; sleep(rand()%3); } printf("consumer count=%dn", count); } int main() { sem_init(&psem, 0, NUM); sem_init(&csem, 0, 0); pthread_t tid[2]; pthread_create(&tid[0], NULL, (void*)producer, NULL); pthread_create(&tid[1], NULL, (void*)consumer, NULL); pthread_join(tid[0], NULL); pthread_join(tid[1], NULL); sem_destroy(&psem); sem_destroy(&csem); return 0; } 3-4.c运行截图如下:
3-5.c:#include#include #include #include #include #include #include typedef struct condition { pthread_mutex_t pmutex; pthread_cond_t pcond; }condition_t; typedef struct task { void *(*run)(void *arg); void *arg; struct task *next; }task_t; typedef struct threadpool { condition_t ready; task_t *first; task_t *last; int counter; int idle; int max_threads; int quit; }threadpool_t; int condition_init(condition_t *cond) { int status; if((status = pthread_mutex_init(&cond->pmutex,NULL)))//返回0代表初始化成功 return status; if((status = pthread_cond_init(&cond->pcond,NULL))) return status; return 0; } int condition_lock(condition_t *cond) { return pthread_mutex_lock(&cond -> pmutex); } int condition_unlock(condition_t *cond) { return pthread_mutex_unlock(&cond -> pmutex); } int condition_wait(condition_t *cond) { return pthread_cond_wait(&cond -> pcond,&cond -> pmutex); } int condition_timewait(condition_t *cond,const struct timespec *abstime) { return pthread_cond_timedwait(&cond->pcond,&cond->pmutex,abstime); } int condition_signal(condition_t *cond) { return pthread_cond_signal(&cond->pcond); } int condition_broadcast(condition_t *cond) { return pthread_cond_broadcast(&cond -> pcond); } int condition_destory(condition_t *cond) { int status; if((status = pthread_mutex_destroy(&cond -> pmutex))) return status; if((status = pthread_cond_destroy(&cond -> pcond))) return status; return 0; } void *thread_routine(void *arg) { struct timespec abstime; int timeout; printf("thread 0x%0x is startingn",(int)pthread_self()); threadpool_t *pool = (threadpool_t *)arg; while(1) { timeout = 0; condition_lock(&pool -> ready); pool -> idle++; //等待队列有任务到来或者线程池销毁的通知 while(pool -> first == NULL && !pool -> quit) { printf("thread 0x%0x is waitingn",(int)pthread_self()); clock_gettime(CLOCK_REALTIME,&abstime); abstime.tv_sec += 2; int status=condition_timewait(&pool -> ready,&abstime); if(status == ETIMEDOUT) { printf("thread 0x%0x is wait timed outn",(int)pthread_self()); timeout = 1; break; } } //等到到条件,处于工作状态 pool -> idle--; if(pool -> first != NULL) { task_t *t = pool -> first; pool -> first = t -> next; //需要先解锁,以便添加新任务。其他消费者线程能够进入等待任务。 condition_unlock(&pool -> ready); t -> run(t->arg); free(t); condition_lock(&pool -> ready); } //等待线程池销毁的通知 if(pool -> quit && pool ->first == NULL) { pool -> counter--; if(pool->counter == 0) { condition_signal(&pool -> ready); } condition_unlock(&pool->ready); //跳出循环之前要记得解锁 break; } if(timeout &&pool -> first ==NULL) { pool -> counter--; condition_unlock(&pool->ready); //跳出循环之前要记得解锁 break; } condition_unlock(&pool -> ready); } printf("thread 0x%0x is exitingn",(int)pthread_self()); return NULL; } //初始化 void threadpool_init(threadpool_t *pool, int threads) { condition_init(&pool -> ready); pool -> first = NULL; pool -> last = NULL; pool -> counter = 0; pool -> idle = 0; pool -> max_threads = threads; pool -> quit = 0; } //加任务 void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg),void *arg) { task_t *newstask = (task_t *)malloc(sizeof(task_t)); newstask->run = run; newstask->arg = arg; newstask -> next = NULL; condition_lock(&pool -> ready); //将任务添加到对列中 if(pool -> first ==NULL) { pool -> first = newstask; } else pool -> last -> next = newstask; pool -> last = newstask; //如果有等待线程,则唤醒其中一个 if(pool -> idle > 0) { condition_signal(&pool -> ready); } else if(pool -> counter < pool -> max_threads) { pthread_t tid; pthread_create(&tid,NULL,thread_routine,pool); pool -> counter++; } condition_unlock(&pool -> ready); } //销毁线程池 void threadpool_destory(threadpool_t *pool) { if(pool -> quit) { return; } condition_lock(&pool -> ready); pool->quit = 1; if(pool -> counter > 0) { if(pool -> idle > 0) condition_broadcast(&pool->ready); while(pool -> counter > 0) { condition_wait(&pool->ready); } } condition_unlock(&pool->ready); condition_destory(&pool -> ready); } void *mytask(void *arg) { printf("thread 0x%0x is working on task %dn",(int)pthread_self(),*(int*)arg); sleep(1); free(arg); return NULL; } int main() { threadpool_t pool; threadpool_init(&pool,3); int i ; for(i = 0; i < 10; i++) { int *arg = (int *)malloc(sizeof(int)); *arg = i; threadpool_add_task(&pool,mytask,arg); } sleep(15); threadpool_destory(&pool); return 0; } 3-5.c运行截图如下:
2.参考https://www.cnblogs.com/jiangzhaowei/p/10383049.html编写复杂的线程池;
Main.c:#include#include #include #include #include #include #include #include "lib_thread_pool.h" static void* thread_1(void* arg); static void* thread_2(void* arg); static void* thread_3(void* arg); static void DisplayPoolStatus(CThread_pool_t* pPool); int nKillThread = 0; int main() { CThread_pool_t* pThreadPool = NULL; pThreadPool = ThreadPoolConstruct(2, 1); int nNumInput = 5; char LogInput[] = "OK!"; DisplayPoolStatus(pThreadPool); pThreadPool->AddWorkUnlimit((void*)pThreadPool, thread_1, (void*)NULL); usleep(10); pThreadPool->AddWorkUnlimit((void*)pThreadPool, thread_2, (void*)nNumInput); usleep(10); pThreadPool->AddWorkUnlimit((void*)pThreadPool, thread_3, (void*)LogInput); usleep(10); DisplayPoolStatus(pThreadPool); nKillThread = 1; usleep(100); DisplayPoolStatus(pThreadPool); nKillThread = 2; usleep(100); DisplayPoolStatus(pThreadPool); nKillThread = 3; usleep(100); DisplayPoolStatus(pThreadPool); pThreadPool->Destruct((void*)pThreadPool); return 0; } static void* thread_1(void* arg) { printf("Thread 1 is running !n"); while(nKillThread != 1) usleep(10); return NULL; } static void* thread_2(void* arg) { int nNum = (int)arg; printf("Thread 2 is running !n"); printf("Get Number %dn", nNum); while(nKillThread != 2) usleep(10); return NULL; } static void* thread_3(void* arg) { char *pLog = (char*)arg; printf("Thread 3 is running !n"); printf("Get String %sn", pLog); while(nKillThread != 3) usleep(10); return NULL; } static void DisplayPoolStatus(CThread_pool_t* pPool) { static int nCount = 1; printf("******************n"); printf("nCount = %dn", nCount++); printf("max_thread_num = %dn", pPool->GetMaxThreadNum((void*)pPool)); printf("current_pthread_num = %dn", pPool->GetCurThreadNum((void*)pPool)); printf("current_pthread_task_num = %dn", pPool->GetCurTaskThreadNum((void*)pPool)); printf("cur_queue_size = %dn", pPool->GetCurTaskNum((void*)pPool)); printf("******************n"); } Lib_thread_pool.c:
#include#include #include #include #include #include #include #include "lib_thread_pool.h" static void * ThreadPoolRoutine (void *arg); static int ThreadPoolAddWorkLimit(void* pthis,void *(*process) (void *arg), void *arg) { CThread_pool_t *pool = (CThread_pool_t *)pthis; worker_t *newworker = (worker_t *) malloc (sizeof (worker_t)); if (NULL == newworker) { return -1; } newworker->process = process; newworker->arg = arg; newworker->next = NULL; pthread_mutex_lock(&(pool->queue_lock)); worker_t *member = pool->queue_head; if (member != NULL) { while (member->next != NULL) { member = member->next; } member->next = newworker; } else { pool->queue_head = newworker; } assert (pool->queue_head != NULL); pool->cur_queue_size++; int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num; if((0 == FreeThreadNum) && (pool->current_pthread_num < pool->max_thread_num)) { int current_pthread_num = pool->current_pthread_num; pool->threadid = (pthread_t *) realloc(pool->threadid,(current_pthread_num + 1) * sizeof (pthread_t)); pthread_create (&(pool->threadid[current_pthread_num]), NULL, ThreadPoolRoutine, (void*)pool); pool->current_pthread_num++; pool->current_pthread_task_num++; pthread_mutex_unlock (&(pool->queue_lock)); pthread_cond_signal (&(pool->queue_ready)); return 0; } pool->current_pthread_task_num++; pthread_mutex_unlock(&(pool->queue_lock)); pthread_cond_signal(&(pool->queue_ready)); // usleep(10); //看情况加 return 0; } static int ThreadPoolAddWorkUnlimit(void* pthis,void *(*process) (void *arg), void *arg) { CThread_pool_t *pool = (CThread_pool_t *)pthis; worker_t *newworker = (worker_t *) malloc (sizeof (worker_t)); if (NULL == newworker) { return -1; } newworker->process = process; newworker->arg = arg; newworker->next = NULL; pthread_mutex_lock(&(pool->queue_lock)); worker_t *member = pool->queue_head; if (member != NULL) { while (member->next != NULL) { member = member->next; } member->next = newworker; } else { pool->queue_head = newworker; } assert (pool->queue_head != NULL); pool->cur_queue_size++; int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num; if(0 == FreeThreadNum) { int current_pthread_num = pool->current_pthread_num; pool->threadid = (pthread_t *) realloc(pool->threadid,(current_pthread_num + 1) * sizeof (pthread_t)); pthread_create (&(pool->threadid[current_pthread_num]), NULL, ThreadPoolRoutine, (void*)pool); pool->current_pthread_num++; if (pool->current_pthread_num > pool->max_thread_num) { pool->max_thread_num = pool->current_pthread_num; } pool->current_pthread_task_num++; pthread_mutex_unlock (&(pool->queue_lock)); pthread_cond_signal (&(pool->queue_ready)); return 0; } pool->current_pthread_task_num++; pthread_mutex_unlock(&(pool->queue_lock)); pthread_cond_signal(&(pool->queue_ready)); // usleep(10); //看情况加 return 0; } static int ThreadPoolGetThreadMaxNum(void* pthis) { CThread_pool_t *pool = (CThread_pool_t *)pthis; pthread_mutex_lock(&(pool->queue_lock)); int num = pool->max_thread_num; pthread_mutex_unlock(&(pool->queue_lock)); return num; } static int ThreadPoolGetCurrentThreadNum(void* pthis) { CThread_pool_t *pool = (CThread_pool_t *)pthis; pthread_mutex_lock(&(pool->queue_lock)); int num = pool->current_pthread_num; pthread_mutex_unlock(&(pool->queue_lock)); return num; } static int ThreadPoolGetCurrentTaskThreadNum(void* pthis) { CThread_pool_t *pool = (CThread_pool_t *)pthis; pthread_mutex_lock(&(pool->queue_lock)); int num = pool->current_pthread_task_num; pthread_mutex_unlock(&(pool->queue_lock)); return num; } static int ThreadPoolGetCurrentTaskNum(void* pthis) { CThread_pool_t *pool = (CThread_pool_t *)pthis; pthread_mutex_lock(&(pool->queue_lock)); int num = pool->cur_queue_size; pthread_mutex_unlock(&(pool->queue_lock)); return num; } static int ThreadPoolDestroy (void *pthis) { CThread_pool_t *pool = (CThread_pool_t *)pthis; if (pool->shutdown) { return -1; } pool->shutdown = 1; pthread_cond_broadcast (&(pool->queue_ready)); int i; for (i = 0; i < pool->current_pthread_num; i++) { pthread_join (pool->threadid[i], NULL); } free (pool->threadid); worker_t *head = NULL; while (pool->queue_head != NULL) { head = pool->queue_head; pool->queue_head = pool->queue_head->next; free (head); } pthread_mutex_destroy(&(pool->queue_lock)); pthread_cond_destroy(&(pool->queue_ready)); free (pool); pool=NULL; return 0; } static void * ThreadPoolRoutine (void *arg) { CThread_pool_t *pool = (CThread_pool_t *)arg; while (1) { pthread_mutex_lock (&(pool->queue_lock)); while ((pool->cur_queue_size == 0) && (!pool->shutdown)) { pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock)); } if (pool->shutdown) { pthread_mutex_unlock (&(pool->queue_lock)); pthread_exit (NULL); } assert (pool->cur_queue_size != 0); assert (pool->queue_head != NULL); pool->cur_queue_size--; worker_t *worker = pool->queue_head; pool->queue_head = worker->next; pthread_mutex_unlock (&(pool->queue_lock)); (*(worker->process)) (worker->arg); pthread_mutex_lock (&(pool->queue_lock)); pool->current_pthread_task_num--; free (worker); worker = NULL; if ((pool->current_pthread_num - pool->current_pthread_task_num) > pool->free_pthread_num) { pthread_mutex_unlock (&(pool->queue_lock)); break; } pthread_mutex_unlock (&(pool->queue_lock)); } pool->current_pthread_num--; pthread_exit (NULL); return (void*)NULL; } CThread_pool_t* ThreadPoolConstruct(int max_num,int free_num) { CThread_pool_t *pool = (CThread_pool_t *) malloc (sizeof (CThread_pool_t)); if (NULL == pool) { return NULL; } memset(pool, 0, sizeof(CThread_pool_t)); pthread_mutex_init (&(pool->queue_lock), NULL); pthread_cond_init (&(pool->queue_ready), NULL); pool->queue_head = NULL; pool->max_thread_num = max_num; pool->cur_queue_size = 0; pool->current_pthread_task_num = 0; pool->shutdown = 0; pool->current_pthread_num = 0; pool->free_pthread_num = free_num; pool->threadid = NULL; pool->threadid = (pthread_t *) malloc (max_num * sizeof (pthread_t)); pool->AddWorkUnlimit = ThreadPoolAddWorkUnlimit; pool->AddWorkLimit = ThreadPoolAddWorkLimit; pool->Destruct = ThreadPoolDestroy; pool->GetMaxThreadNum = ThreadPoolGetThreadMaxNum; pool->GetCurThreadNum = ThreadPoolGetCurrentThreadNum; pool->GetCurTaskThreadNum = ThreadPoolGetCurrentTaskThreadNum; pool->GetCurTaskNum = ThreadPoolGetCurrentTaskNum; int i = 0; for (i = 0; i < max_num; i++) { pool->current_pthread_num++; pthread_create (&(pool->threadid[i]), NULL, ThreadPoolRoutine, (void*)pool); usleep(1000); } return pool; } CThread_pool_t* ThreadPoolConstructDefault(void) { CThread_pool_t *pool = (CThread_pool_t *) malloc (sizeof (CThread_pool_t)); if (NULL == pool) { return NULL; } memset(pool, 0, sizeof(CThread_pool_t)); pthread_mutex_init(&(pool->queue_lock), NULL); pthread_cond_init(&(pool->queue_ready), NULL); pool->queue_head = NULL; pool->max_thread_num = DEFAULT_MAX_THREAD_NUM; pool->cur_queue_size = 0; pool->current_pthread_task_num = 0; pool->shutdown = 0; pool->current_pthread_num = 0; pool->free_pthread_num = DEFAULT_FREE_THREAD_NUM; pool->threadid = NULL; pool->AddWorkUnlimit = ThreadPoolAddWorkUnlimit; pool->AddWorkLimit = ThreadPoolAddWorkLimit; pool->Destruct = ThreadPoolDestroy; pool->GetMaxThreadNum = ThreadPoolGetThreadMaxNum; pool->GetCurThreadNum = ThreadPoolGetCurrentThreadNum; pool->GetCurTaskThreadNum = ThreadPoolGetCurrentTaskThreadNum; pool->GetCurTaskNum = ThreadPoolGetCurrentTaskNum; return pool; } Lib_thread_pool.h: #ifndef __PTHREAD_POOL_H__ #define __PTHREAD_POOL_H__ #include #define DEFAULT_MAX_THREAD_NUM 100 #define DEFAULT_FREE_THREAD_NUM 10 typedef struct worker_t worker_t; typedef struct CThread_pool_t CThread_pool_t; struct worker_t { void *(*process) (void *arg); int paratype; void *arg; struct worker_t *next; }; struct CThread_pool_t { pthread_mutex_t queue_lock; pthread_cond_t queue_ready; worker_t *queue_head; int shutdown; pthread_t *threadid; int max_thread_num; int current_pthread_num; int current_pthread_task_num; int cur_queue_size; int free_pthread_num; int (*AddWorkUnlimit)(void* pthis,void *(*process) (void *arg), void *arg); int (*AddWorkLimit)(void* pthis,void *(*process) (void *arg), void *arg); int (*GetMaxThreadNum) (void *pthis); int (*GetCurThreadNum) (void *pthis); int (*GetCurTaskThreadNum) (void *pthis); int (*GetCurTaskNum) (void *pthis); int (*Destruct) (void *pthis); }; CThread_pool_t* ThreadPoolConstruct(int max_num,int free_num); CThread_pool_t* ThreadPoolConstructDefault(void); #endif Makefile:
edit:main.o lib_thread_pool.o gcc -pthread -o edit main.o lib_thread_pool.o main.o:main.c lib_thread_pool.h gcc -pthread -c main.c lib_thread_pool.o:lib_thread_pool.c lib_thread_pool.h gcc -pthread -c lib_thread_pool.c .PHONY : clean clean: rm -rf *.o运行:
Make ./edit Make clean程序运行截图:
3.改造属于自己的线程池代码
Threadpool.h:#ifndef __THREAD_POOL_H__ #define __THREAD_POOL_H__ #includetypedef void *(*pool_task_f)(void *arg); typedef struct _task{ pool_task_f process; void *arg; struct _task *next;}pool_task; typedef struct{ pthread_t *threadid; int threads_limit; int destroy_flag; pool_task *queue_head; int task_in_queue; pthread_mutex_t queue_lock; pthread_cond_t queue_ready; }pool_t; void pool_init(pool_t *pool, int threads_limit); int pool_uninit(pool_t *pool); int pool_add_task(pool_t *pool, pool_task_f process, void *arg); #endif Threadpool.c:
#include#include #include #include #include "threadpool.h" static void *pool_thread_server(void *arg); void pool_init(pool_t *pool, int threads_limit){ pool->threads_limit = threads_limit; pool->queue_head = NULL; pool->task_in_queue = 0; pool->destroy_flag = 0; pool->threadid = (pthread_t *)calloc(threads_limit, sizeof(pthread_t)); int i = 0; pthread_mutex_init(&(pool->queue_lock), NULL); pthread_cond_init(&(pool->queue_ready), NULL); for (i = 0; i < threads_limit; i++){ pthread_create(&(pool->threadid[i]), NULL, pool_thread_server, pool); } return; } int pool_uninit(pool_t *pool){ pool_task *head = NULL; int i; pthread_mutex_lock(&(pool->queue_lock)); if(pool->destroy_flag) return -1; pool->destroy_flag = 1; pthread_mutex_unlock(&(pool->queue_lock)); pthread_cond_broadcast(&(pool->queue_ready)); for (i = 0; i < pool->threads_limit; i++) pthread_join(pool->threadid[i], NULL); free(pool->threadid); pthread_mutex_lock(&(pool->queue_lock)); while(pool->queue_head != NULL){ head = pool->queue_head; pool->queue_head = pool->queue_head->next; free(head); } pthread_mutex_unlock(&(pool->queue_lock)); pthread_mutex_destroy(&(pool->queue_lock)); pthread_cond_destroy(&(pool->queue_ready)); return 0; } static void enqueue_task(pool_t *pool, pool_task_f process, void *arg){ pool_task *task = NULL; pool_task *member = NULL; pthread_mutex_lock(&(pool->queue_lock)); if(pool->task_in_queue >= pool->threads_limit){ printf("task_in_queue > threads_limit!n"); pthread_mutex_unlock (&(pool->queue_lock)); return; } task = (pool_task *)calloc(1, sizeof(pool_task)); assert(task != NULL); task->process = process; task->arg = arg; task->next = NULL; pool->task_in_queue++; member = pool->queue_head; if(member != NULL){ while(member->next != NULL) member = member->next; member->next = task; }else{ pool->queue_head = task; } printf("ttasks %dn", pool->task_in_queue); pthread_cond_signal (&(pool->queue_ready)); pthread_mutex_unlock (&(pool->queue_lock));} static pool_task *dequeue_task(pool_t *pool){ pool_task *task = NULL; pthread_mutex_lock(&(pool->queue_lock)); if(pool->destroy_flag){ pthread_mutex_unlock(&(pool->queue_lock)); printf("thread 0x%lx will be destroyedn", pthread_self()); pthread_exit(NULL); } if(pool->task_in_queue == 0){ while((pool->task_in_queue == 0) && (!pool->destroy_flag)){ printf("thread 0x%lx is leisuren", pthread_self()); pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock)); } }else{ pool->task_in_queue--; task = pool->queue_head; pool->queue_head = task->next; printf("thread 0x%lx received a taskn", pthread_self()); } pthread_mutex_unlock(&(pool->queue_lock)); return task;} int pool_add_task(pool_t *pool, pool_task_f process, void *arg){ enqueue_task(pool, process, arg); return 0;} static void *pool_thread_server(void *arg){ pool_t *pool = NULL; pool = (pool_t *)arg; while(1){ pool_task *task = NULL; task = dequeue_task(pool); if(task != NULL){ printf ("thread 0x%lx is busyn", pthread_self()); task->process(task->arg); free(task); task = NULL; } } pthread_exit(NULL); return NULL; } Main.c:
#include#include #include "threadpool.h" void *task_test(void *arg){ printf("ttworking on task %dn", (int)arg); sleep(1); return NULL; } int main (int argc, char *argv[]){ pool_t pool; int i = 0; pool_init(&pool, 2);//初始化线程池 sleep(1); for(i = 0; i < 5; i++){ sleep(1); pool_add_task(&pool, task_test, (void *)i);//向线程池中添加一个任务 } sleep(4); pool_uninit(&pool);//销毁线程池 return 0; } 运行:
gcc -L/usr/DirectFB25/lib -I/usr/DirectFB25/include/directfb -lpthread -ldl main.c threadpool.c threadpool.h -o edit ./edit运行截图如下:
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)