
具体代码如下:
以下是两个线程:
import java.util.*
public class Thread_List_Operation {
//假设有这么一个队列
static List list = new LinkedList()
public static void main(String[] args) {
Thread t
t = new Thread(new T1())
t.start()
t = new Thread(new T2())
t.start()
}
}
//线程T1,用来给list添加新元素
class T1 implements Runnable{
void getElemt(Object o){
Thread_List_Operation.list.add(o)
System.out.println(Thread.currentThread().getName() + "为队列添加了一个元素")
}
@Override
public void run() {
for (int i = 0i <10i++) {
getElemt(new Integer(1))
}
}
}
//线程T2,用来给list添加新元素
class T2 implements Runnable{
void getElemt(Object o){
Thread_List_Operation.list.add(o)
System.out.println(Thread.currentThread().getName() + "为队列添加了一个元素")
}
@Override
public void run() {
for (int i = 0i <10i++) {
getElemt(new Integer(1))
}
}
}
//结果(乱序)
Thread-0为队列添加了一个元素
Thread-1为队列添加了一个元素
Thread-0为队列添加了一个元素
Thread-1为队列添加了一个元素
Thread-1为队列添加了一个元素
Thread-1为队列添加了一个元素
Thread-1为队列添加了一个元素
Thread-1为队列添加了一个元素
Thread-1为队列添加了一个元素
Thread-1为队列添加了一个元素
Thread-1为队列添加了一个元素
Thread-1为队列添加了一个元素
Thread-0为队列添加了一个元素
Thread-0为队列添加了一个元素
Thread-0为队列添加了一个元素
Thread-0为队列添加了一个元素
Thread-0为队列添加了一个元素
Thread-0为队列添加了一个元素
Thread-0为队列添加了一个元素
Thread-0为队列添加了一个元素
我们在应用程序中, 要完成一些异步工作, 同时在异步工作中又要尽量少用switch case, 这时task任务继承方式就可以达到效果。我们可以在作业队列中分配或添加任务,然后作业队列负责完成任务。这里的工作队列可以是多线程的,也可以是单线程的。本文采用的是多线程的实现,单线程的可以留言给我。
设计一个多线程就可以参照线程池的框架, 它要包含几个部分:
a. 线程池管理器:用于创建并管理线程池
b. 工作线程: 线程池中实际执行的线程
c. 任务接口:将任务抽象出来,形成任务接口,从而可以通过继承来实现自己要做的事
d. 任务队列:这里是通过个queue来保存。
本文完成的工作队列是以跨平台为前提, 可以在多个平台上运行。并具有以下一些特点:
a. 作业队列是可以单线程也可以多线程
b. 作业队列有优先级,具有高优先级的作业将在队列中具有较低优先级的作业之前添加。因此,它们将在其他较低优先级作业之前处理。
c. 作业队列可以暂停,因此暂停时不能处理新作业。但是用户仍然可以将作业添加到队列中。一旦用户选择恢复作业队列,作业将被处理。
我们实现的通用线程池框架由五个重要部分组成ZLThread,ZLWorkerThread,ZLThreadManager,ZLThreadPool,ZLTask,除此之外框架中还包括线程同步使用的类ZLMutex和ZLCondition,其中重要部分的大体联系如下图:
a. ZLTask是所有的任务的基类,其提供一个接口DoTaskProc,所有的任务类都必须从该类继承,同时实现DoTaskProc方法。该方法中实现具体的任务逻辑。
b. ZLThread是线程的类抽象,其封装了各个系统线程最经常使用的属性和方法,是所有线程类的基类,具有一个接口Run。
c. ZLWorkerThread是实际被调度和执行的线程类,其从ZLThread继承而来,实现了ZLThread中的Run方法。
d. ZLThreadPool是线程池类,其负责保存线程,释放线程以及调度线程。
e. ZLThreadManager是线程池与用户的直接接口,其屏蔽了内部的具体实现。
f. ZLMutex用于线程之间的互斥。
g. ZLCondition则是条件变量的封装,用于线程之间的同步。
把线程与task任务进行统一管理,创建有限的线程数来处理task任务,如下图:
从图上可以看出主要含有三个队列,任务队列,工作线程队列,忙碌线程队列;任务队列是一个阻塞队列,任务不断地被push进来,这里要有一个线程来取任务,获取空闲线程, 交于空闲线程去处理,如果获取到任务,则将线程会进入忙碌线程队列中,执行任务的DoTaskProc工作,当工作完成,重新移出工作线程队列。
http://blog.csdn.net/ithzhang/article/details/9020283
开源: https://git.coding.net/clzhan/MulitiThreadJob.git
1、python提供两种方式使用多线程:一个是基于函数:_thread模块或者threading模块。一个是基于类:theading.Thread
使用多线程函数包装线程对象:_thread
_thead.start_new_thead(func,*args,**kwargs)
args,**kwargs是被包装函数的入参,必须传入元祖或字典
使用多线程函数包装线程对象:threading
threading._start_new_thread(func,*args,**kwargs):开启线程,带元祖或字典
threading.currentThread():返回当前线程变量
threading.enumerate():正在运行的线程列表,不含未启动和已结束线程
threading.activeCount():返回正在运行的线程数量
threading.settrace(func):为所有threading模块启动的线程设置追踪函数,在调用run方法之前,func会被传给追踪函数
threading.setprofile(func):为所有threading模块启动的线程设置性能测试函数,也是在run方法调用前就传递给性能测试函数
使用多线程类包装线程对象:threading.Thread
Thread类提供以下方法:
run():表示线程活动的方法,线程需要控制些什么活动都在这里面定义。当线程对象一但被创建,其活动一定会因调用线程的 start() 方法开始。这会在独立的控制线程调用 run() 方法。
start():开启线程活动
join():等待线程中止,阻塞当前线程直到被调用join方法的线程中止。线程A调用线程B的join方法,那线程A将会被阻塞至线程B中止。
isAlive():返回线程是否还活动
getName():获取线程名字
setName():设置线程名字
Lock对象:实例化线程锁,包含acquire方法获取锁 和 release 方法释放锁,在最开始创建锁的时候,锁为未锁定状态,调用acquire方法后锁置为锁定状态,此时其他线程再调用acquire方法就将会被阻塞至其他线程调用release方法释放锁,如果释放一个并未被锁定的锁将会抛出异常。支持上下文管理协议,直接with lock 无需调用锁定,释放方法
Rlock对象:重入锁,相比lock增加了线程和递归的概念。比如:线程目标函数F,在获得锁之后执行函数G,但函数G也需要先获得锁,此时同一线程,F获得锁,G等待,F等待G执行,就造成了死锁,此时使用rlock可避免。一旦线程获得了重入锁,同一个线程再次获取它将不阻塞;但线程必须在每次获取它时释放一次。
daemon属性:设置该线程是否是守护线程,默认为none,需要在调用start方法之前设置好
事件对象:一个线程发出事件信号 ,其他线程收到信号后作出对应活动。实例化事件对象后,初始事件标志为flase。调用其wait方法将阻塞当前所属线程,至事件标志为true时。调用set方法可将事件标志置为true,被阻塞的线程将被执行。调用clear方法可将事件标志置为flase
注意点:
1、继承threading.Thread类,初始化时要记得继承父类的__init__方法
2、run()方法只能有一个入参,故尽量把启动线程时的参数入参到初始化的时候
3、锁要设定全局的,一个子线程获得一个锁没有意义
以下实例:有一个列表,线程A从尾到头遍历元素,线程B从头到尾将元素值重置为1,设置线程锁之前线程A遍历到头部的数据已经被修改,设置线程锁之后不会再有数据不一致的情况
import threading,time
class tt(threading.Thread):
def __init__(self,name,func,ll):
threading.Thread.__init__(self) #继承父级的初始化方法
self.name=name
self.func=func #run方法只能带一个入参,故把方法入参到初始化的时候
self.ll=ll
def run(self):
print(self.name)
threadlock.acquire() #获得锁
self.func(self.ll)
threadlock.release() #释放锁
def readd(x):
a=len(x)
while a>0:
print(x[a-1])
a-=1
def sett(x):
for i in range(len(x)):
x[i]=1
print(x)
if __name__=="__main__":
l = [0,0,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20]
threadlock=threading.Lock() #实例化全局锁
th1=tt("read",readd,l)
th2=tt("set",sett,l)
th1.start()
th2.start()
th_list=[]
th_list.append(th1)
th_list.append(th2)
for li in th_list:
li.join() #主线程被阻塞,直到两个子线程处理结束
print("主线程结束")
2、队列
queue模块包含queue.Queue(maxsize=0)先入先出队列,queue.LifoQueue()先入后出队列,和queue.PriorityQueue()优先级可设置的队列
Queue 模块中的常用方法:
Queue.qsize() 返回队列的大小,获取的数据不可靠,因为一直有线程在 *** 作队列,数据一直变化
Queue.empty() 如果队列为空,返回True,反之False
Queue.full() 如果队列满了,返回True,反之False
Queue.full 与 maxsize 大小对应
Queue.put(block=true,timeout=none) 将item数据写入队列,block=True,设置线程是否阻塞,设置阻塞当队列数据满了之后就会阻塞,一直到队列数据不满时继续添加,如果设置不阻塞,当队列满了就会一直到timeout到后报错
Queue.get([block[, timeout]]) 取出队列数据,block=True,设置线程是否阻塞。设置阻塞,将会等待直到队列不为空有数据可取出,设置不阻塞直到超过timeout等待时间后报错
Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
Queue.join() 实际上意味着等到队列为空,再执行别的 *** 作。会在队列有未完成时阻塞,等待队列无未完成的任务,取出数据get()之后还需要配置task_done使用才能让等待队列数-1
import queue,time
import threading
q=queue.Queue(maxsize=5)
def sett():
a=0
while a<20:
q.put(a,True)
print("%d被put"%a)
a+=1
def gett():
time.sleep(1)
while not q.empty(): #只要队列没空,一直取数据
print("%d被取出"%q.get(True))
q.task_done() #取出一次数据,将未完成任务-1,不然使用join方法线程会一直阻塞
if __name__=="__main__":
th1=threading._start_new_thread(sett,()) #不带参数也要传入空元祖不然会报错
th2=threading._start_new_thread(gett,())
time.sleep(1) #延时主线程1S,等待put线程已经put部分数据到队列
q.join()#阻塞主线程,直到未完成任务为0
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)