Linux第三次实验:Linux下的线程及线程池实验

Linux第三次实验:Linux下的线程及线程池实验,第1张

Linux第三次实验:Linux下的线程及线程池实验

Linux第三次实验:Linux下的线程及线程池实验
  • 前言
  • 一、实验目的
  • 二、实验工具与设备
  • 三、实验预备知识
  • 四、实验内容和步骤
  • 五、实验代码及步骤截图

前言

为了帮助同学们完成痛苦的实验课程设计,本作者将其作出的实验结果及代码贴至CSDN中,供同学们学习参考。如有不足或描述不完善之处,敬请各位指出,欢迎各位的斧正!

一、实验目的
  1. 理解Linux的线程原理。
  2. 掌握和使用线程函数实现线程的 *** 作。
  3. 掌握和使用互斥锁和信号量实现线程同步。
  4. 掌握线程池。
二、实验工具与设备

装有Linux系统的计算机

三、实验预备知识

线程可以提高应用程序在多核环境下处理诸如文件I/O或者socket I/O等会产生堵塞的情况的表现性能。在Unix系统中,一个进程包含很多东西,包括可执行程序以及一大堆的诸如文件描述符地址空间等资源。在很多情况下,完成相关任务的不同代码间需要交换数据。如果采用多进程的方式,那么通信就需要在用户空间和内核空间进行频繁的切换,开销很大。但是如果使用多线程的方式,因为可以使用共享的全局变量,所以线程间的通信(数据交换)变得非常高效。
头文件:#include
需要的编译条件:-pthread
线程结构体pthread_attr_t中记录线程的信息。

typedef struct
{
    int               detachstate;   线程的分离状态
    int               schedpolicy;   线程调度策略
    structsched_param schedparam;    线程的调度参数
    int               inheritsched;  线程的继承性
    int               scope;         线程的作用域
    size_t            guardsize;     线程栈末尾的警戒缓冲区大小
    int               stackaddr_set; 
    void*             stackaddr;     线程栈的位置
    size_t            stacksize;     线程栈的大小
}pthread_attr_t;

线程相关的接口函数:
对线程属性初始化/去除初始化

int pthread_attr_init(pthread_attr_t *attr); int pthread_attr_destroy(pthread_attr_t *attr);

创建线程

int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);

向线程发送终止信号

int pthread_cancel(pthread_t thread);

使线程进入分离态

int pthread_detach(pthread_t thread);

比较两个线程的标识符

int pthread_equal(pthread_t t1, pthread_t t2);

使线程退出

void pthread_exit(void *retval);

通过结构体初始化,用于明确指定线程属性

int pthread_getattr_np(pthread_t thread, pthread_attr_t *attr);

等待指定线程的结束

int pthread_join(pthread_t thread, void **retval);

返回线程当前的描述符

pthread_t pthread_self(void);
四、实验内容和步骤
  1. 创建线程。
#include 
#include 
 void *mythread1(void)
{
	int i;
	for(i = 0; i < 10; i++)
	{
		printf("This is the pthread1,i=%dn",i);
		sleep(1);
	}
}
void *mythread2(void)
{
	int i;
	for(i = 0; i < 10; i++)
	{
		printf("This is the pthread1,i=%dn",i );
		sleep(1);
	}
}
int main(int argc, const char *argv[])
{
	int i = 0;
	int ret = 0;
	pthread_t id1,id2;
	ret = pthread_create(&id1, NULL, (void *)mythread1,NULL);
	if(ret)
	{
		printf("Create pthread error!n");
		return 1;
	}
	ret = pthread_create(&id2, NULL, (void *)mythread2,NULL);
	if(ret)
	{
		printf("Create pthread error!n");
		return 1;
	}
	pthread_join(id1,NULL);
	pthread_join(id2,NULL);
 	return 0;
}

执行以下命令编译程序并执行:gcc -o Createthread Createthread.c -lpthread

  1. 创建两个线程实现对一个数的递加。
#include 
#include 
#include 
#include 
#define MAX 20
pthread_t thread[2];  //两个线程
pthread_mutex_t mut;
int number=0;
int i;
void *thread1()
{
        printf ("thread1 : this is the thread1n");
        for (i = 0; i < MAX; i++)   //模拟线程执行时间
        {
                printf("thread1 : number = %dn",number);
                pthread_mutex_lock(&mut);
                        number++;
                pthread_mutex_unlock(&mut);
                sleep(2);
        }
        printf("thread1 :主函数在等我完成任务吗?n");
        pthread_exit(NULL);
}
void *thread2()
{
        printf(""thread2 : this is the thread2n");
        for (i = 0; i < MAX; i++)
        {
                printf("thread2 : number = %dn",number);
                pthread_mutex_lock(&mut);
                        number++;
                pthread_mutex_unlock(&mut);
                sleep(3);
        }
        printf("thread2 :主函数在等我完成任务吗?n");
        pthread_exit(NULL);
}
void thread_create(void)   //创建两个线程
{
        int temp;
        memset(&thread, 0, sizeof(thread));          //comment1
        
        if((temp = pthread_create(&thread[0], NULL, thread1, NULL)) != 0)  //comment2     
                printf("线程1创建失败n");
        else
                printf("线程1被创建n");
        if((temp = pthread_create(&thread[1], NULL, thread2, NULL)) != 0)  //comment3
                printf("线程2创建失败n");
        else
                printf("线程2被创建n");
}
void thread_wait(void)
{
        
        if(thread[0] !=0)
           {             //comment4    
                pthread_join(thread[0],NULL);
                printf("线程1已经结束n");
          }
        if(thread[1] !=0) 
           {  
                //comment5
               pthread_join(thread[1],NULL);
                printf("线程2已经结束n");
         }
}
int main()
{
        
        pthread_mutex_init(&mut,NULL);
        printf("创建线程n");
        thread_create();
        printf("等待线程完成任务n");
        thread_wait();
        return 0;
}

通过以下命令编译:gcc -lpthread -o thread_example thread_example.c

  1. 线程同步
    未同步示例程序:
#include
#include
#include
#include
#include
#define LEN 100000
int num = 0;
void* thread_func(void* arg) {
    for (int i = 0; i< LEN; ++i) {
        num += 1;
    }
    return NULL;
}
int main() {
    pthread_t tid1, tid2;
    pthread_create(&tid1, NULL, (void*)thread_func, NULL);
    pthread_create(&tid2, NULL, (void*)thread_func, NULL);
    char* rev = NULL;
    pthread_join(tid1, (void *)&rev);
    pthread_join(tid2, (void *)&rev);
    printf("correct result=%d, wrong result=%d.n", 2*LEN, num);
    return 0;
}

运行结果:correct result=200000, wrong result=106860.
通过互斥解决同步问题的程序:

#include
#include
#include
#include
#include
#define LEN 100000
int num = 0;
void* thread_func(void* arg) {
    pthread_mutex_t* p_mutex = (pthread_mutex_t*)arg;
    for (int i = 0; i< LEN; ++i) {
        pthread_mutex_lock(p_mutex);
        num += 1;
        pthread_mutex_unlock(p_mutex);
    }
    return NULL;
}
int main() {
    pthread_mutex_t m_mutex;
    pthread_mutex_init(&m_mutex, NULL);
    pthread_t tid1, tid2;
    pthread_create(&tid1, NULL, (void*)thread_func, (void*)&m_mutex);
    pthread_create(&tid2, NULL, (void*)thread_func, (void*)&m_mutex);
    pthread_join(tid1, NULL);
    pthread_join(tid2, NULL);
    pthread_mutex_destroy(&m_mutex);
    printf("correct result=%d, result=%d.n", 2*LEN, num);
    return 0;
}

运行结果:correct result=200000, result=200000.

  1. 编写信号量使用程序
#include
#include
#include
#include
#include
#include
#include
#include
#define NUM 5
int queue[NUM];
sem_t psem, csem; 
void producer(void* arg) {
    int pos = 0;
    int num, count = 0;
    for (int i=0; i<12; ++i) {
        num = rand() % 100;
        count += num;
        sem_wait(&psem);
        queue[pos] = num;
        sem_post(&csem);
        printf("producer: %dn", num); 
        pos = (pos+1) % NUM;
        sleep(rand()%2);
    }
    printf("producer count=%dn", count);
}
void consumer(void* arg){
    int pos = 0;
    int num, count = 0;
    for (int i=0; i<12; ++i) {
        sem_wait(&csem);
        num = queue[pos];
        sem_post(&psem);
        printf("consumer: %dn", num);
        count += num;
        pos = (pos+1) % NUM;
        sleep(rand()%3);
    }
    printf("consumer count=%dn", count);    
} 
int main() {
    sem_init(&psem, 0, NUM);
    sem_init(&csem, 0, 0);
    pthread_t tid[2];
    pthread_create(&tid[0], NULL, (void*)producer, NULL);
    pthread_create(&tid[1], NULL, (void*)consumer, NULL);
    pthread_join(tid[0], NULL);
    pthread_join(tid[1], NULL);
    sem_destroy(&psem);
    sem_destroy(&csem);
    return 0;
}
  1. 编写线程池程序
    1.理解并运行以下线程实现
#include 
#include 
#include 
#include 
#include 
#include 
#include 
typedef struct condition
{
	pthread_mutex_t pmutex;
	pthread_cond_t pcond;
}condition_t;
typedef struct task
{
	void *(*run)(void *arg);
	void *arg;
	struct task *next;
}task_t;
typedef struct threadpool
{
	condition_t ready;
	task_t *first;
	task_t *last;
	int counter;
	int idle;
	int max_threads;
	int quit;
}threadpool_t;
int condition_init(condition_t *cond)
{
	int status;
	if((status = pthread_mutex_init(&cond->pmutex,NULL)))//返回0代表初始化成功
		return status;
	if((status = pthread_cond_init(&cond->pcond,NULL)))
		return status;
	return 0;
}
int condition_lock(condition_t *cond)
{
	return pthread_mutex_lock(&cond -> pmutex);
}
int condition_unlock(condition_t *cond)
{
	return pthread_mutex_unlock(&cond -> pmutex);
}
int condition_wait(condition_t *cond)
{
	return pthread_cond_wait(&cond -> pcond,&cond -> pmutex);
}
int condition_timewait(condition_t *cond,const struct timespec *abstime)
{
	return pthread_cond_timedwait(&cond->pcond,&cond->pmutex,abstime);
}
int condition_signal(condition_t *cond)
{
	return pthread_cond_signal(&cond->pcond);
}
int condition_broadcast(condition_t *cond)
{
	return pthread_cond_broadcast(&cond -> pcond);
}
int condition_destory(condition_t *cond)
{
	int status;
	if((status = pthread_mutex_destroy(&cond -> pmutex)))
		return status;
	if((status = pthread_cond_destroy(&cond -> pcond)))
		return status;
	return 0;
}
void *thread_routine(void *arg)
{
	struct timespec abstime;
	int timeout;
	printf("thread 0x%0x is startingn",(int)pthread_self());
	threadpool_t *pool = (threadpool_t *)arg;
	while(1)
	{
		timeout = 0;
		condition_lock(&pool -> ready);
		pool -> idle++;
		//等待队列有任务到来或者线程池销毁的通知
		while(pool -> first == NULL && !pool -> quit)
		{
			printf("thread 0x%0x is waitingn",(int)pthread_self());
			clock_gettime(CLOCK_REALTIME,&abstime);
			abstime.tv_sec += 2;
			int status=condition_timewait(&pool -> ready,&abstime);
			if(status == ETIMEDOUT)
			{
				printf("thread 0x%0x is wait timed outn",(int)pthread_self());
				timeout = 1;
				break;
			}
		}
		//等到到条件,处于工作状态
		pool -> idle--;
	    if(pool -> first != NULL)
		{
			task_t *t = pool -> first;
			pool -> first = t -> next;
			//需要先解锁,以便添加新任务。其他消费者线程能够进入等待任务。
			condition_unlock(&pool -> ready);
			t -> run(t->arg);
			free(t);
			condition_lock(&pool -> ready);
		}
		//等待线程池销毁的通知
		if(pool -> quit && pool ->first == NULL)
		{
			pool -> counter--;
			if(pool->counter == 0)
			{
				condition_signal(&pool -> ready);
			}
			condition_unlock(&pool->ready);
			//跳出循环之前要记得解锁
			break;
		}
		if(timeout &&pool -> first ==NULL)
		{
			pool -> counter--;
			condition_unlock(&pool->ready);
			//跳出循环之前要记得解锁
			break;
		}
		condition_unlock(&pool -> ready);
	}
	printf("thread 0x%0x is exitingn",(int)pthread_self());
	return NULL;
}
//初始化
void threadpool_init(threadpool_t *pool, int threads)
{
	condition_init(&pool -> ready);
	pool -> first = NULL;
	pool -> last = NULL;
	pool -> counter = 0;
	pool -> idle = 0;
	pool -> max_threads = threads;
	pool -> quit = 0;
}
//加任务
void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg),void *arg)
{
	task_t *newstask = (task_t *)malloc(sizeof(task_t));
	newstask->run = run;
	newstask->arg = arg;
	newstask -> next = NULL;
	
	condition_lock(&pool -> ready);
	//将任务添加到对列中
	if(pool -> first ==NULL)
	{
		pool -> first = newstask;
	}
	else
		pool -> last -> next = newstask;
	pool -> last = newstask;
	//如果有等待线程,则唤醒其中一个
	if(pool -> idle > 0)
	{
		condition_signal(&pool -> ready);
	}
	else if(pool -> counter < pool -> max_threads)
	{
		pthread_t tid;
		pthread_create(&tid,NULL,thread_routine,pool);
		pool -> counter++;
	}
	condition_unlock(&pool -> ready);
}
//销毁线程池
void  threadpool_destory(threadpool_t *pool)
{
	if(pool -> quit)
	{
		return;
	}
	condition_lock(&pool -> ready);
	pool->quit = 1;
	if(pool -> counter > 0)
	{
		if(pool -> idle > 0)
			condition_broadcast(&pool->ready);
		while(pool -> counter > 0)
		{
			condition_wait(&pool->ready);
		}
	}
	condition_unlock(&pool->ready);
	condition_destory(&pool -> ready);
}
void *mytask(void *arg)
{
	printf("thread 0x%0x is working on task %dn",(int)pthread_self(),*(int*)arg);
	sleep(1);
	free(arg);
	return NULL;
}
int main()
{
	threadpool_t pool;
	threadpool_init(&pool,3);
	
	int i ;
	for(i = 0; i < 10; i++)
	{
		int *arg = (int *)malloc(sizeof(int));
		*arg = i;
		threadpool_add_task(&pool,mytask,arg);
	}
	
	sleep(15);
	threadpool_destory(&pool);
	return 0;
}

编译运行:gcc Threadpool.c -o Threadpool -lpthread -lrt
2.参考https://www.cnblogs.com/jiangzhaowei/p/10383049.html编写复杂的线程池;
3.改造属于自己的线程池代码。

五、实验代码及步骤截图

3-1.c:

#include 
#include 
 void *mythread1(void)
{
	int i;
	for(i = 0; i < 10; i++)
	{
		printf("This is the pthread1,i=%dn",i);
		sleep(1);
	}
}
void *mythread2(void)
{
	int i;
	for(i = 0; i < 10; i++)
	{
		printf("This is the pthread2,i=%dn",i );
		sleep(1);
	}
}
int main(int argc, const char *argv[])
{
	int i = 0;
	int ret = 0;
	pthread_t id1,id2;
	ret = pthread_create(&id1, NULL, (void *)mythread1,NULL);
	if(ret)
	{
		printf("Create pthread error!n");
		return 1;
	}
	ret = pthread_create(&id2, NULL, (void *)mythread2,NULL);
	if(ret)
	{
		printf("Create pthread error!n");
		return 1;
	}
	pthread_join(id1,NULL);
	pthread_join(id2,NULL);
 	return 0;
}

实验3-1运行截图如下:

3-2.c:

#include 
#include 
#include 
#include 
#define MAX 20
pthread_t thread[2];  //两个线程
pthread_mutex_t mut;
int number=0;
int i;
void *thread1()
{
	printf ("thread1 : this is the thread1n");
	for (i = 0; i < MAX; i++)   //模拟线程执行时间
	{
		printf("thread1 : number = %dn",number);
		pthread_mutex_lock(&mut);
		number++;
		pthread_mutex_unlock(&mut);
		sleep(2);
        }
	printf("thread1 :主函数在等我完成任务吗?n");
	pthread_exit(NULL);
}
void *thread2()
{
	printf("thread2 : this is the thread2n");
	for (i = 0; i < MAX; i++)
	{
		printf("thread2 : number = %dn",number);
		pthread_mutex_lock(&mut);
		number++;
		pthread_mutex_unlock(&mut);
		sleep(3);
	}
	printf("thread2 :主函数在等我完成任务吗?n");
	pthread_exit(NULL);
}
void thread_create(void)   //创建两个线程
{
	int temp;
	memset(&thread, 0, sizeof(thread));          //comment1
        
	if((temp = pthread_create(&thread[0], NULL, thread1, NULL)) != 0)  //comment2     
		printf("线程1创建失败n");
	else
		printf("线程1被创建n");
	if((temp = pthread_create(&thread[1], NULL, thread2, NULL)) != 0)  //comment3
		printf("线程2创建失败n");
	else
		printf("线程2被创建n");
}
void thread_wait(void)
{
        
	if(thread[0] !=0)
	{             //comment4    
		pthread_join(thread[0],NULL);
		printf("线程1已经结束n");
	}
	if(thread[1] !=0) 
	{  
		//comment5
		pthread_join(thread[1],NULL);
		printf("线程2已经结束n");
	}
}
int main()
{
        
	pthread_mutex_init(&mut,NULL);
	printf("创建线程n");
	thread_create();
	printf("等待线程完成任务n");
	thread_wait();
	return 0;
}

实验3-2运行截图如下:

3-3.c:

#include
#include
#include
#include
#include
#define LEN 100000
int num = 0;
void* thread_func(void* arg)
{
	int i;
	for (i=0; i 

3-3.c运行截图如下:

3-3-1.c:

#include
#include
#include
#include
#include
#define LEN 100000
int num = 0;
void* thread_func(void* arg)
{
	pthread_mutex_t* p_mutex = (pthread_mutex_t*)arg;
	int i;
	for (i = 0; i< LEN; ++i)
	{
		pthread_mutex_lock(p_mutex);
		num += 1;
		pthread_mutex_unlock(p_mutex);
	}
	return NULL;
}
int main()
{
    pthread_mutex_t m_mutex;
    pthread_mutex_init(&m_mutex, NULL);
    pthread_t tid1, tid2;
    pthread_create(&tid1, NULL, (void*)thread_func, (void*)&m_mutex);
    pthread_create(&tid2, NULL, (void*)thread_func, (void*)&m_mutex);
    pthread_join(tid1, NULL);
    pthread_join(tid2, NULL);
    pthread_mutex_destroy(&m_mutex);
    printf("correct result=%d, result=%d.n", 2*LEN, num);
    return 0;
}

3-3-1.c运行截图如下:

3-4.c:

#include
#include
#include
#include
#include
#include
#include
#include
#define NUM 5
int queue[NUM];
sem_t psem, csem; 
void producer(void* arg) {
    int pos = 0;
    int num, count = 0;
	int i;
    for (i=0; i<12; ++i) {
        num = rand() % 100;
        count += num;
        sem_wait(&psem);
        queue[pos] = num;
        sem_post(&csem);
        printf("producer: %dn", num); 
        pos = (pos+1) % NUM;
        sleep(rand()%2);
    }
    printf("producer count=%dn", count);
}
void consumer(void* arg){
    int pos = 0;
    int num, count = 0;
	int i;
    for (i=0; i<12; ++i) {
        sem_wait(&csem);
        num = queue[pos];
        sem_post(&psem);
        printf("consumer: %dn", num);
        count += num;
        pos = (pos+1) % NUM;
        sleep(rand()%3);
	}
    printf("consumer count=%dn", count);    
} 
int main() {
    sem_init(&psem, 0, NUM);
    sem_init(&csem, 0, 0);
    pthread_t tid[2];
    pthread_create(&tid[0], NULL, (void*)producer, NULL);
    pthread_create(&tid[1], NULL, (void*)consumer, NULL);
    pthread_join(tid[0], NULL);
    pthread_join(tid[1], NULL);
    sem_destroy(&psem);
    sem_destroy(&csem);
    return 0;
}

3-4.c运行截图如下:

3-5.c:

#include 
#include 
#include 
#include 
#include 
#include 
#include 
typedef struct condition
{
	pthread_mutex_t pmutex;
	pthread_cond_t pcond;
}condition_t;
typedef struct task
{
	void *(*run)(void *arg);
	void *arg;
	struct task *next;
}task_t;
typedef struct threadpool
{
	condition_t ready;
	task_t *first;
	task_t *last;
	int counter;
	int idle;
	int max_threads;
	int quit;
}threadpool_t;
int condition_init(condition_t *cond)
{
	int status;
	if((status = pthread_mutex_init(&cond->pmutex,NULL)))//返回0代表初始化成功
		return status;
	if((status = pthread_cond_init(&cond->pcond,NULL)))
		return status;
	return 0;
}
int condition_lock(condition_t *cond)
{
	return pthread_mutex_lock(&cond -> pmutex);
}
int condition_unlock(condition_t *cond)
{
	return pthread_mutex_unlock(&cond -> pmutex);
}
int condition_wait(condition_t *cond)
{
	return pthread_cond_wait(&cond -> pcond,&cond -> pmutex);
}
int condition_timewait(condition_t *cond,const struct timespec *abstime)
{
	return pthread_cond_timedwait(&cond->pcond,&cond->pmutex,abstime);
}
int condition_signal(condition_t *cond)
{
	return pthread_cond_signal(&cond->pcond);
}
int condition_broadcast(condition_t *cond)
{
	return pthread_cond_broadcast(&cond -> pcond);
}
int condition_destory(condition_t *cond)
{
	int status;
	if((status = pthread_mutex_destroy(&cond -> pmutex)))
		return status;
	if((status = pthread_cond_destroy(&cond -> pcond)))
		return status;
	return 0;
}
void *thread_routine(void *arg)
{
	struct timespec abstime;
	int timeout;
	printf("thread 0x%0x is startingn",(int)pthread_self());
	threadpool_t *pool = (threadpool_t *)arg;
	while(1)
	{
		timeout = 0;
		condition_lock(&pool -> ready);
		pool -> idle++;
		//等待队列有任务到来或者线程池销毁的通知
		while(pool -> first == NULL && !pool -> quit)
		{
			printf("thread 0x%0x is waitingn",(int)pthread_self());
			clock_gettime(CLOCK_REALTIME,&abstime);
			abstime.tv_sec += 2;
			int status=condition_timewait(&pool -> ready,&abstime);
			if(status == ETIMEDOUT)
			{
				printf("thread 0x%0x is wait timed outn",(int)pthread_self());
				timeout = 1;
				break;
			}
		}
		//等到到条件,处于工作状态
		pool -> idle--;
	    if(pool -> first != NULL)
		{
			task_t *t = pool -> first;
			pool -> first = t -> next;
			//需要先解锁,以便添加新任务。其他消费者线程能够进入等待任务。
			condition_unlock(&pool -> ready);
			t -> run(t->arg);
			free(t);
			condition_lock(&pool -> ready);
		}
		//等待线程池销毁的通知
		if(pool -> quit && pool ->first == NULL)
		{
			pool -> counter--;
			if(pool->counter == 0)
			{
				condition_signal(&pool -> ready);
			}
			condition_unlock(&pool->ready);
			//跳出循环之前要记得解锁
			break;
		}
		if(timeout &&pool -> first ==NULL)
		{
			pool -> counter--;
			condition_unlock(&pool->ready);
			//跳出循环之前要记得解锁
			break;
		}
		condition_unlock(&pool -> ready);
	}
	printf("thread 0x%0x is exitingn",(int)pthread_self());
	return NULL;
}
//初始化
void threadpool_init(threadpool_t *pool, int threads)
{
	condition_init(&pool -> ready);
	pool -> first = NULL;
	pool -> last = NULL;
	pool -> counter = 0;
	pool -> idle = 0;
	pool -> max_threads = threads;
	pool -> quit = 0;
}
//加任务
void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg),void *arg)
{
	task_t *newstask = (task_t *)malloc(sizeof(task_t));
	newstask->run = run;
	newstask->arg = arg;
	newstask -> next = NULL;
	
	condition_lock(&pool -> ready);
	//将任务添加到对列中
	if(pool -> first ==NULL)
	{
		pool -> first = newstask;
	}
	else
		pool -> last -> next = newstask;
	pool -> last = newstask;
	//如果有等待线程,则唤醒其中一个
	if(pool -> idle > 0)
	{
		condition_signal(&pool -> ready);
	}
	else if(pool -> counter < pool -> max_threads)
	{
		pthread_t tid;
		pthread_create(&tid,NULL,thread_routine,pool);
		pool -> counter++;
	}
	condition_unlock(&pool -> ready);
}
//销毁线程池
void  threadpool_destory(threadpool_t *pool)
{
	if(pool -> quit)
	{
		return;
	}
	condition_lock(&pool -> ready);
	pool->quit = 1;
	if(pool -> counter > 0)
	{
		if(pool -> idle > 0)
			condition_broadcast(&pool->ready);
		while(pool -> counter > 0)
		{
			condition_wait(&pool->ready);
		}
	}
	condition_unlock(&pool->ready);
	condition_destory(&pool -> ready);
}
void *mytask(void *arg)
{
	printf("thread 0x%0x is working on task %dn",(int)pthread_self(),*(int*)arg);
	sleep(1);
	free(arg);
	return NULL;
}
int main()
{
	threadpool_t pool;
	threadpool_init(&pool,3);
	int i ;
	for(i = 0; i < 10; i++)
	{
		int *arg = (int *)malloc(sizeof(int));
		*arg = i;
		threadpool_add_task(&pool,mytask,arg);
	}
	
	sleep(15);
	threadpool_destory(&pool);
	return 0;
}

3-5.c运行截图如下:

2.参考https://www.cnblogs.com/jiangzhaowei/p/10383049.html编写复杂的线程池;
Main.c:

#include  
#include  
#include  
#include  
#include  
#include  
#include 
#include "lib_thread_pool.h"
static void* thread_1(void* arg);
static void* thread_2(void* arg);
static void* thread_3(void* arg);
static void DisplayPoolStatus(CThread_pool_t* pPool);
int nKillThread = 0;
int main()
{
	CThread_pool_t* pThreadPool = NULL;
	pThreadPool = ThreadPoolConstruct(2, 1);
	int nNumInput = 5;
	char LogInput[] = "OK!";
	DisplayPoolStatus(pThreadPool);
	
	pThreadPool->AddWorkUnlimit((void*)pThreadPool, thread_1, (void*)NULL);
	
	usleep(10);	
	pThreadPool->AddWorkUnlimit((void*)pThreadPool, thread_2, (void*)nNumInput);
	usleep(10);
	pThreadPool->AddWorkUnlimit((void*)pThreadPool, thread_3, (void*)LogInput);
	usleep(10);
	DisplayPoolStatus(pThreadPool);
	nKillThread = 1;
	usleep(100);	
	DisplayPoolStatus(pThreadPool);
	nKillThread = 2;
	usleep(100);
	DisplayPoolStatus(pThreadPool);
	nKillThread = 3;
	usleep(100);
	DisplayPoolStatus(pThreadPool);
	pThreadPool->Destruct((void*)pThreadPool);
	return 0;
}
static void* thread_1(void* arg)
{
	printf("Thread 1 is running !n");
	while(nKillThread != 1)
		usleep(10);
	return NULL;
}
static void* thread_2(void* arg)
{
	int nNum = (int)arg;
	
	printf("Thread 2 is running !n");
	printf("Get Number %dn", nNum);
	while(nKillThread != 2)
		usleep(10);
	return NULL;
}
static void* thread_3(void* arg)
{
	char *pLog = (char*)arg;
	
	printf("Thread 3 is running !n");
	printf("Get String %sn", pLog);
	while(nKillThread != 3)
		usleep(10);
	return NULL;
}
static void DisplayPoolStatus(CThread_pool_t* pPool)
{
	static int nCount = 1;
	
	printf("******************n");
	printf("nCount = %dn", nCount++);
	printf("max_thread_num = %dn", pPool->GetMaxThreadNum((void*)pPool));
	printf("current_pthread_num = %dn", pPool->GetCurThreadNum((void*)pPool));
	printf("current_pthread_task_num = %dn", pPool->GetCurTaskThreadNum((void*)pPool));
	printf("cur_queue_size = %dn", pPool->GetCurTaskNum((void*)pPool));
	printf("******************n");
}

Lib_thread_pool.c:


#include  
#include  
#include  
#include  
#include  
#include  
#include 
#include "lib_thread_pool.h"



static void * ThreadPoolRoutine (void *arg); 


static int ThreadPoolAddWorkLimit(void* pthis,void *(*process) (void *arg), void *arg) 
{ 
	CThread_pool_t *pool = (CThread_pool_t *)pthis;
    worker_t *newworker = (worker_t *) malloc (sizeof (worker_t)); 
	if (NULL == newworker)
	{
		return -1;
	}
    newworker->process 	= process;	
    newworker->arg 		= arg;		
    newworker->next 	= NULL;
    pthread_mutex_lock(&(pool->queue_lock)); 
	
    worker_t *member = pool->queue_head;	
    if (member != NULL) 
    { 
        while (member->next != NULL) 
        {
        	member = member->next; 
        }
        member->next = newworker;	
    } 
    else 
    { 
        pool->queue_head = newworker;	
    } 
	
    assert (pool->queue_head != NULL); 
    pool->cur_queue_size++;		
	int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num;		
    if((0 == FreeThreadNum) && (pool->current_pthread_num < pool->max_thread_num))
    {
    	int current_pthread_num = pool->current_pthread_num;
    	pool->threadid = (pthread_t *) realloc(pool->threadid,(current_pthread_num + 1) * sizeof (pthread_t));	 
		pthread_create (&(pool->threadid[current_pthread_num]), NULL, ThreadPoolRoutine,  (void*)pool);
		pool->current_pthread_num++;		
		
		pool->current_pthread_task_num++;		
		pthread_mutex_unlock (&(pool->queue_lock)); 
    	pthread_cond_signal (&(pool->queue_ready));		
		return 0;
    }
	pool->current_pthread_task_num++; 
    pthread_mutex_unlock(&(pool->queue_lock)); 
    pthread_cond_signal(&(pool->queue_ready));
//	usleep(10);	//看情况加
    return 0; 
} 

static int ThreadPoolAddWorkUnlimit(void* pthis,void *(*process) (void *arg), void *arg) 
{ 
	CThread_pool_t *pool = (CThread_pool_t *)pthis;
    worker_t *newworker = (worker_t *) malloc (sizeof (worker_t)); 
	
	if (NULL == newworker)
	{
		return -1;
	}
    newworker->process 	= process;	
    newworker->arg 		= arg; 		
    newworker->next 	= NULL;		
    pthread_mutex_lock(&(pool->queue_lock)); 
	
    worker_t *member = pool->queue_head; 
    if (member != NULL) 	
    { 
        while (member->next != NULL) 
        {
        	member = member->next; 
        }
        member->next = newworker; 
    } 
    else 
    { 
        pool->queue_head = newworker; 
    } 
	
    assert (pool->queue_head != NULL); 
    pool->cur_queue_size++; 
	int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num;
    if(0 == FreeThreadNum)	
    {
    	int current_pthread_num = pool->current_pthread_num;
    	pool->threadid = (pthread_t *) realloc(pool->threadid,(current_pthread_num + 1) * sizeof (pthread_t)); 
		pthread_create (&(pool->threadid[current_pthread_num]), NULL, ThreadPoolRoutine,  (void*)pool);
		pool->current_pthread_num++;
		if (pool->current_pthread_num > pool->max_thread_num)
		{
			pool->max_thread_num = pool->current_pthread_num;
		}
		
		pool->current_pthread_task_num++;
		pthread_mutex_unlock (&(pool->queue_lock)); 
    	pthread_cond_signal (&(pool->queue_ready)); 
		return 0;
    }
	
	pool->current_pthread_task_num++;
    pthread_mutex_unlock(&(pool->queue_lock)); 
    pthread_cond_signal(&(pool->queue_ready)); 	
//	usleep(10);	//看情况加
    return 0; 
} 

static int ThreadPoolGetThreadMaxNum(void* pthis) 
{ 
	CThread_pool_t *pool = (CThread_pool_t *)pthis;
	
    pthread_mutex_lock(&(pool->queue_lock)); 
	int num = pool->max_thread_num;
    pthread_mutex_unlock(&(pool->queue_lock)); 
	
    return num; 
} 

static int ThreadPoolGetCurrentThreadNum(void* pthis) 
{ 
	CThread_pool_t *pool = (CThread_pool_t *)pthis;
	
    pthread_mutex_lock(&(pool->queue_lock)); 
	int num = pool->current_pthread_num;
    pthread_mutex_unlock(&(pool->queue_lock)); 
	
    return num; 
} 

static int ThreadPoolGetCurrentTaskThreadNum(void* pthis) 
{ 
	CThread_pool_t *pool = (CThread_pool_t *)pthis;
	
    pthread_mutex_lock(&(pool->queue_lock)); 
	int num = pool->current_pthread_task_num;
    pthread_mutex_unlock(&(pool->queue_lock)); 
	
    return num; 
} 

static int ThreadPoolGetCurrentTaskNum(void* pthis) 
{ 
	CThread_pool_t *pool = (CThread_pool_t *)pthis;
	
    pthread_mutex_lock(&(pool->queue_lock)); 
	int num = pool->cur_queue_size;
    pthread_mutex_unlock(&(pool->queue_lock)); 
	
    return num; 
} 

static int ThreadPoolDestroy (void *pthis) 
{ 
	CThread_pool_t *pool = (CThread_pool_t *)pthis;
	
    if (pool->shutdown) 
    {
    	return -1;
    }
    pool->shutdown = 1;	
	
    pthread_cond_broadcast (&(pool->queue_ready)); 
    int i; 
    for (i = 0; i < pool->current_pthread_num; i++) 
    {
    	pthread_join (pool->threadid[i], NULL); 
    }
	
    free (pool->threadid);	
    worker_t *head = NULL; 
	
    while (pool->queue_head != NULL) 
    { 
        head = pool->queue_head; 
        pool->queue_head = pool->queue_head->next; 
        free (head);	
    } 
	
    pthread_mutex_destroy(&(pool->queue_lock));	
    pthread_cond_destroy(&(pool->queue_ready));      
    free (pool);	 
    pool=NULL; 	
    return 0; 
} 

static void * ThreadPoolRoutine (void *arg) 
{ 
	CThread_pool_t *pool = (CThread_pool_t *)arg;
	
    while (1) 
    { 
        pthread_mutex_lock (&(pool->queue_lock)); 
		
        while ((pool->cur_queue_size == 0) && (!pool->shutdown))	
        { 
            pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock)); 	
		} 
        if (pool->shutdown) 
        { 
            pthread_mutex_unlock (&(pool->queue_lock)); 
            pthread_exit (NULL); 	
        } 
        assert (pool->cur_queue_size != 0); 
        assert (pool->queue_head != NULL); 
         
        pool->cur_queue_size--; 	 
        worker_t *worker 	= pool->queue_head;	 
        pool->queue_head 	= worker->next; 	 
		
        pthread_mutex_unlock (&(pool->queue_lock)); 
        (*(worker->process)) (worker->arg); 	 
        pthread_mutex_lock (&(pool->queue_lock)); 
		
		pool->current_pthread_task_num--;	 
        free (worker); 	
        worker = NULL; 
		if ((pool->current_pthread_num - pool->current_pthread_task_num) > pool->free_pthread_num)
		{
			pthread_mutex_unlock (&(pool->queue_lock)); 
			break;	
		}
        pthread_mutex_unlock (&(pool->queue_lock)); 		
    } 
	
	pool->current_pthread_num--;	
    pthread_exit (NULL);	
    return (void*)NULL;
} 

CThread_pool_t* ThreadPoolConstruct(int max_num,int free_num) 
{ 
    CThread_pool_t *pool = (CThread_pool_t *) malloc (sizeof (CThread_pool_t)); 
	if (NULL == pool)
	{
		return NULL;
	}
	memset(pool, 0, sizeof(CThread_pool_t));
	
    pthread_mutex_init (&(pool->queue_lock), NULL);	
    pthread_cond_init (&(pool->queue_ready), NULL);	 
    pool->queue_head 				= NULL; 
    pool->max_thread_num 			= max_num;	
    pool->cur_queue_size 			= 0; 
	pool->current_pthread_task_num 	= 0;
    pool->shutdown 					= 0; 
	pool->current_pthread_num 		= 0;
	pool->free_pthread_num 			= free_num;	
	pool->threadid					= NULL;
    pool->threadid 					= (pthread_t *) malloc (max_num * sizeof (pthread_t)); 
	pool->AddWorkUnlimit			= ThreadPoolAddWorkUnlimit;	
	pool->AddWorkLimit				= ThreadPoolAddWorkLimit;
	pool->Destruct					= ThreadPoolDestroy;
	pool->GetMaxThreadNum			= ThreadPoolGetThreadMaxNum;
	pool->GetCurThreadNum			= ThreadPoolGetCurrentThreadNum;
	pool->GetCurTaskThreadNum		= ThreadPoolGetCurrentTaskThreadNum;
	pool->GetCurTaskNum				= ThreadPoolGetCurrentTaskNum;
	
    int i = 0; 
    for (i = 0; i < max_num; i++) 
    {  
		pool->current_pthread_num++;	
        pthread_create (&(pool->threadid[i]), NULL, ThreadPoolRoutine, (void*)pool);	
		usleep(1000);
    } 
	return pool;
} 

CThread_pool_t* ThreadPoolConstructDefault(void) 
{ 
    CThread_pool_t *pool = (CThread_pool_t *) malloc (sizeof (CThread_pool_t)); 
	if (NULL == pool)
	{
		return NULL;
	}
	memset(pool, 0, sizeof(CThread_pool_t));
	
    pthread_mutex_init(&(pool->queue_lock), NULL); 
    pthread_cond_init(&(pool->queue_ready), NULL); 
    pool->queue_head 				= NULL; 
    pool->max_thread_num 			= DEFAULT_MAX_THREAD_NUM;	
    pool->cur_queue_size 			= 0; 
	pool->current_pthread_task_num 	= 0;
    pool->shutdown 					= 0; 
	pool->current_pthread_num 		= 0;
	pool->free_pthread_num 			= DEFAULT_FREE_THREAD_NUM;	
	pool->threadid					= NULL;
	pool->AddWorkUnlimit			= ThreadPoolAddWorkUnlimit;
	pool->AddWorkLimit				= ThreadPoolAddWorkLimit;
	pool->Destruct					= ThreadPoolDestroy;
	pool->GetMaxThreadNum			= ThreadPoolGetThreadMaxNum;
	pool->GetCurThreadNum			= ThreadPoolGetCurrentThreadNum;
	pool->GetCurTaskThreadNum		= ThreadPoolGetCurrentTaskThreadNum;
	pool->GetCurTaskNum				= ThreadPoolGetCurrentTaskNum;
	
	return pool;
}
Lib_thread_pool.h:


#ifndef __PTHREAD_POOL_H__
#define __PTHREAD_POOL_H__
#include 

#define DEFAULT_MAX_THREAD_NUM		100
#define DEFAULT_FREE_THREAD_NUM		10	
typedef struct worker_t worker_t;
typedef struct CThread_pool_t CThread_pool_t;


struct worker_t
{
	void *(*process) (void *arg);	
    int   paratype;					
    void *arg;						
    struct worker_t *next;			
};

struct CThread_pool_t
{
	pthread_mutex_t queue_lock;	
    pthread_cond_t queue_ready;	
    worker_t *queue_head;	
    int shutdown;			
    pthread_t *threadid;	
    int max_thread_num;		
	int current_pthread_num;	
	int current_pthread_task_num;	
    int cur_queue_size;		
	int	free_pthread_num;	
	
	
	int (*AddWorkUnlimit)(void* pthis,void *(*process) (void *arg), void *arg); 
	
	int (*AddWorkLimit)(void* pthis,void *(*process) (void *arg), void *arg); 
	
	
	int (*GetMaxThreadNum) (void *pthis); 
		
	int (*GetCurThreadNum) (void *pthis); 
	
	
	int (*GetCurTaskThreadNum) (void *pthis); 
	
	
	int (*GetCurTaskNum) (void *pthis); 
	
	
	int (*Destruct) (void *pthis); 
};


CThread_pool_t* ThreadPoolConstruct(int max_num,int free_num);

CThread_pool_t* ThreadPoolConstructDefault(void);
#endif

Makefile:

edit:main.o lib_thread_pool.o
	gcc -pthread -o edit main.o lib_thread_pool.o
main.o:main.c lib_thread_pool.h
	gcc -pthread -c main.c
lib_thread_pool.o:lib_thread_pool.c lib_thread_pool.h
	gcc -pthread -c lib_thread_pool.c
.PHONY : clean
clean:
	rm -rf *.o

运行:

Make
./edit
Make clean

程序运行截图:

3.改造属于自己的线程池代码
Threadpool.h:

#ifndef __THREAD_POOL_H__
#define __THREAD_POOL_H__
#include 

typedef void *(*pool_task_f)(void *arg);

typedef struct _task{
	pool_task_f process;

	void *arg;
          
	struct _task *next;}pool_task;

typedef struct{
	pthread_t *threadid;
		
	int threads_limit;
			
	int destroy_flag;
			
	pool_task *queue_head;
	    
	int task_in_queue;
			
	pthread_mutex_t queue_lock;
	
	pthread_cond_t queue_ready;
	
}pool_t;

void pool_init(pool_t *pool, int threads_limit);

int pool_uninit(pool_t *pool);

int pool_add_task(pool_t *pool, pool_task_f process, void *arg);
#endif

Threadpool.c:

#include 
#include 
#include 
#include 
#include "threadpool.h"
static void *pool_thread_server(void *arg);

void pool_init(pool_t *pool, int threads_limit){
	pool->threads_limit = threads_limit;
	pool->queue_head = NULL;	pool->task_in_queue = 0;
	pool->destroy_flag = 0;
	
	pool->threadid = (pthread_t *)calloc(threads_limit, sizeof(pthread_t));
	int i = 0;
	
	pthread_mutex_init(&(pool->queue_lock), NULL);
	pthread_cond_init(&(pool->queue_ready), NULL);	

	for (i = 0; i < threads_limit; i++){
		pthread_create(&(pool->threadid[i]), NULL, pool_thread_server, pool);
	}
	return;
}

int pool_uninit(pool_t *pool){
	pool_task *head = NULL;
	int i;
		pthread_mutex_lock(&(pool->queue_lock));
	if(pool->destroy_flag)
		return -1;
	pool->destroy_flag = 1;
	pthread_mutex_unlock(&(pool->queue_lock));
	
	pthread_cond_broadcast(&(pool->queue_ready));
	
	for (i = 0; i < pool->threads_limit; i++)
		pthread_join(pool->threadid[i], NULL);
	free(pool->threadid);
	
	pthread_mutex_lock(&(pool->queue_lock));
	while(pool->queue_head != NULL){
		head = pool->queue_head;
		pool->queue_head = pool->queue_head->next;
		free(head);
	}
	pthread_mutex_unlock(&(pool->queue_lock));
	
	pthread_mutex_destroy(&(pool->queue_lock));
	pthread_cond_destroy(&(pool->queue_ready));
	return 0;
}

static void enqueue_task(pool_t *pool, pool_task_f process, void *arg){
	pool_task *task = NULL;
	pool_task *member = NULL;	
		pthread_mutex_lock(&(pool->queue_lock));
		if(pool->task_in_queue >= pool->threads_limit){
		printf("task_in_queue > threads_limit!n");
		pthread_mutex_unlock (&(pool->queue_lock));
		return;
	
}		task = (pool_task *)calloc(1, sizeof(pool_task));
	assert(task != NULL);
	task->process = process;
	task->arg = arg;
	task->next = NULL;
	pool->task_in_queue++;
	member = pool->queue_head;
	if(member != NULL){
		while(member->next != NULL)
	
			member = member->next;
		member->next = task;
	}else{
		pool->queue_head = task;
		}
	printf("ttasks %dn", pool->task_in_queue);
	
	pthread_cond_signal (&(pool->queue_ready));
	pthread_mutex_unlock (&(pool->queue_lock));}

static pool_task *dequeue_task(pool_t *pool){
	pool_task *task = NULL;
		pthread_mutex_lock(&(pool->queue_lock));
	
	if(pool->destroy_flag){
		pthread_mutex_unlock(&(pool->queue_lock));
		printf("thread 0x%lx will be destroyedn", pthread_self());
		pthread_exit(NULL);
	}
	
	if(pool->task_in_queue == 0){
		while((pool->task_in_queue == 0) && (!pool->destroy_flag)){
			printf("thread 0x%lx is leisuren", pthread_self());
						pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));
		}
	}else{
		
		pool->task_in_queue--;
		task = pool->queue_head;
		pool->queue_head = task->next;
		printf("thread 0x%lx received a taskn", pthread_self());
	}
	pthread_mutex_unlock(&(pool->queue_lock));
	return task;}

int pool_add_task(pool_t *pool, pool_task_f process, void *arg){
	enqueue_task(pool, process, arg);
	return 0;}

static void *pool_thread_server(void *arg){
	pool_t *pool = NULL;
		pool = (pool_t *)arg;
	while(1){
		pool_task *task = NULL;
		task = dequeue_task(pool);
		
		if(task != NULL){
			printf ("thread 0x%lx is busyn", pthread_self());
			task->process(task->arg);
			free(task);
			task = NULL;
		}
	}
	
	pthread_exit(NULL);
	 	return NULL;
} 

Main.c:

#include 
#include 
#include "threadpool.h"

void *task_test(void *arg){
	printf("ttworking on task %dn", (int)arg);
	sleep(1);
	  
	return NULL;
}
int main (int argc, char *argv[]){
  	pool_t pool;
	int i = 0;
	pool_init(&pool, 2);//初始化线程池
	sleep(1);
	for(i = 0; i < 5; i++){
		sleep(1);
		pool_add_task(&pool, task_test, (void *)i);//向线程池中添加一个任务
	}
	sleep(4);
	pool_uninit(&pool);//销毁线程池
	return 0;
} 

运行:

gcc -L/usr/DirectFB25/lib -I/usr/DirectFB25/include/directfb -lpthread -ldl main.c threadpool.c threadpool.h -o edit
./edit

运行截图如下:

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/zaji/5694463.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-12-17
下一篇2022-12-17

发表评论

登录后才能评论

评论列表(0条)

    保存