
@[TOC] Python 多进程学习
前言研究回调函数中学习了Python的多进程, 记录备忘
常用的两种方式启动多进程
用multiprocessing 库的Process 或 Pool
感觉Pool比较方便
进程池Pool
from multiprocessing import Process, Pool
import time
import random
import os
def download(f):
print('%s_ID= %s pid=%d,ppid=%d'%(str(time.ctime()),str(f),os.getpid(),os.getppid()))
for i in range(3):
print(f,'--文件--%d'%i)
time.sleep(random.randint(5, 15))
if random.randint(1,9) >= 5:
return {"f:": f,"PID:": os.getpid(),"PPID:": os.getppid(),"time:": time.ctime(),"result": 0, "info": '下载失败!'}
else:
return {"f:": f,"PID:": os.getpid(),"PPID:": os.getppid(),"time:": time.ctime(),"result": 1, "info": '下载完成!'}
if __name__ == "__main__":
p = Pool(2)
result = []
result.append( p.map_async(func=download, iterable=(1111,2222,3333,4444,5555,6666)))
#也可以用apply_async()函数
#result.append( p.apply_async(func=download, args=(1111,)))
#result.append( p.apply_async(func=download, args=(2222,)))
#result.append( p.apply_async(func=download, args=(3333,)))
#result.append( p.apply_async(func=download, args=(4444,)))
#result.append( p.apply_async(func=download, args=(5555,)))
#result.append( p.apply_async(func=download, args=(6666,)))
p.close()#关闭进程池,关闭后,p不再接收新的请求
print(str(time.ctime()) + "---start----")
intCnt = 0
while intCnt < 5:
print("%s: Now intCnt value is: %d sleep for 5s: "%( str(time.ctime()),intCnt))
time.sleep(5)
intCnt +=1
print("%sNow print results"%time.ctime())
for i in range(6,2,-1):
print(result[i-1].get())
print(result[0].get())
print('main process pid: %d, ppid:%d '%(os.getpid(),os.getppid()))
print(str(time.ctime()) + '---end----')
close()函数作用是关闭进程池(关门,不接受新进程加入的请求)而不是关闭进程。关闭进程是terminate(). 进程是在调用apply_async()时就启动了,并不是调用close()函数后才启动的。事实上,没有close()也不影响读取map_async()和apply_async()的返回值,会阻塞主进程。map_async() 更适用于调用的处理程序是同一只,但是调用参数不一样的情况,所以批量调用,写法简单;而apply_async()则是一个一个地调用,适合于每次调用的处理程序可能不太一样的情况。实测两个方法在子进程出现错误后,进程都会被利用。只是apply_async()因为是单独处理的,所以正常运行的子程序返回值都可以取到。但是map_async()只要有一个出问题,会导致所有的返回值都取不到。如果没有其它阻塞主进程的动作,又想确保子进程能执行完毕,可以用join()函数来阻塞主进程。进程之间(包括主进程与子进程) 是独立的,所以不能在子进程中读取或设置主进程中的变量(反之亦然).map_async()和apply_async()都可以有optional的callback参数,可以指定进程结束后的回调函数。也可以理解为上例中的download函数也是回调函数. 注意,map_async()和apply_async()的返回值是由func参数指定的函数返回的,而不是由callback参数返回的。
from multiprocessing import Process,Pool
import os
import time
from datetime import datetime
import random
def callback_setFlag(f):
print('now this thread starts')
print('Income parameter is:{}',f)
keyboard.add_hotkey('ctrl+shift+a', lambda: print('test'))
keyboard.wait('ctrl+alt+q')
return {'result':0, 'Information': 'user pressed key to stop!'}
def alterUser(msg):
print(r"I'm messager: " + msg['Information'])
return msg['result']
def setStopFlag():
global blStop
print(r"Hahahaha, I'll let you stop!!!")
blStop = True
if __name__ == '__main__':
blStop = False
p = Pool(1)
result = p.apply_async(func=callback_setFlag, args=('thread1',), callback=alterUser)
p.close()
keyboard.add_hotkey('ctrl+shift+t',setStopFlag)
print('----------Start-----------------')
intCnt = 0
while intCnt < 10 and blStop == False:
print( "%s Print result from loop in main process, intCnt is: %d"%(time.ctime(),intCnt))
time.sleep(5)
print("now wake up!")
intCnt +=1
if blStop == False:
print( 'Timeout!')
else:
print( 'End: stopped by user!')
print(result.get())
#p.join()
print('----------End-----------------')
进程间通信
注意, 对于Pool,Queues要用mutiprocessing.Manager().Queue(), 不能直接用mutiprocessing.Queue()
from multiprocessing import Process,Pool,Pipe,Queue,Manager
import os
import time
from datetime import datetime
import random
import itertools
def callback_setFlag(q,f):
print('now this thread starts:%d'%f)
keyboard.add_hotkey('ctrl+shift+a', lambda: q.put(str(f) + ' send message: hot key pressed!'))
keyboard.wait('ctrl+alt+q')
q.put('%d send message: user decided to quit! Now try to read queue.')
if not(q.empty()):
print("%d read queue: %s" %(f, q.get(True, 2)))
else:
print("%d read queue: no message in queue!"%f)
return {'result':0, 'Information': 'user pressed key to stop!'}
def download(q,f):
print('%s__进程池中的进程——pid=%d,ppid=%d'%(str(time.ctime()),os.getpid(),os.getppid()))
for i in range(3):
q.put('%d send message: now downloadfile %d'%(f,i))
if not(q.empty()):
print("%d read queue: %s" %(f, q.get(True, 2)))
print(f,'--文件--%d'%i)
time.sleep(random.randint(5, 25))
# time.sleep(1)
if random.randint(1,9) >= 5:
return {"f:": f,"PID:": os.getpid(),"PPID:": os.getppid(),"time:": time.ctime(),"result": 0, "info": '下载失败!'}
else:
return {"f:": f,"PID:": os.getpid(),"PPID:": os.getppid(),"time:": time.ctime(),"result": 1, "info": '下载完成!'}
def alterUser(msg):
print(r"I'm messager: " + msg['Information'])
return msg['result']
def setFlag():
global blStop
print(r"Hahahaha, I'll let you stop!!!")
blStop = True
if __name__ == '__main__':
blStop = False
p = Pool(3)
#conn1, conn2 = Pipe()
q = Manager().Queue()
result = []
result.append( p.apply_async(func=callback_setFlag, args=(q,0000,)))
for i in (1111,7777, 1111):
result.append(p.apply_async(func = download, args = (q,i,)))
p.close()
keyboard.add_hotkey('ctrl+shift+t',setFlag,args=None,suppress=True)
print('----------Start-----------------')
intCnt = 0
while intCnt < 50 and blStop == False:
print( "%s Print result from loop in main process, intCnt is: %d"%(time.ctime(),intCnt))
q.put(r"From main process: I'm going to sleep now, wait for me!!")
time.sleep(5)
print("now wake up! Read Queue")
if not(q.empty()):
print("Main process read queue: %s" % q.get(True, 2))
else:
print("From main process: no message!")
intCnt +=1
if blStop == False:
print( 'Timeout!')
else:
print( 'End: stopped by user!')
#print(result.get())
#p.join()
print('----------End-----------------')
Queue放进去,只能取出来一次,不是广播模式。如果想在各个进程间互相通信,估计需要设计成令牌模式,取出来,判断一下,如果是给自己的,则处理,不是给自己的,再放回去
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)