python多线程并行计算通过向线程池ThreadPoolExecutor提交任务的实现方法

python多线程并行计算通过向线程池ThreadPoolExecutor提交任务的实现方法,第1张

Python的线程池可以有效地控制系统中并发线程的数量。

当程序中需要创建许多生存期较短的线程执行运算任务时,首先考虑使用线程池。线程池任务启动时会创建出最大线程数参数 max_workers 指定数量的空闲线程,程序只要将执行函数提交给线程池,线程池就会启动一个空闲的线程来执行它。当该函数执行结束后,该线程并不会死亡,而是再次返回到线程池中变成空闲状态,等待执行下一个函数。配合使用 with 关键字实现任务队列完成后自动关闭线程池释放资源。

python-selenium并发执行测试用例(方法一 各模块每一条并发执行)

总执行代码:

# coding=utf-8

import unittest,os,time

import HTMLTestRunner

import threading

import sys

syspathappend('C:/Users/Dell/Desktop/CARE/program')#使用编辑器,要指定当前目录,不然无法执行第20行代码

def creatsuite():

casedir = []

list = oslistdir(ospathdirname(osgetcwd()))#获取当前路径的上一级目录的所有文件夹,这里可以改成绝对路径(要搜索的文件路径)

for xx in list:

if "baidu" in xx:

casedirappend(xx)

suite =[]

for n in casedir:

testunit = unittestTestSuite()

unittestdefaultTestLoader_top_level_dir = None

#(unittestdefaultTestLoader(): defaultTestLoader()类,通过该类下面的discover()方法可自动更具测试目录start_dir匹配查找测试用例文件(testpy),

并将查找到的测试用例组装到测试套件,因此可以直接通过run()方法执行discover)

discover = unittestdefaultTestLoaderdiscover(str(n),pattern='tet_py',top_level_dir=None)

for test_suite in discover:

for test_case in test_suite:

testunitaddTests(test_case)

suiteappend(testunit)

return suite, casedir

def runcase(suite,casedir):

lastPath = ospathdirname(osgetcwd())#获取当前路径的上一级

resultDir = lastPath+"\\run\\report\\" #报告存放路径

now = timestrftime("%Y-%m-%d %H%M%S",timelocaltime())

filename = resultDir + now +" resulthtml"

fp = file(filename, 'wb')

proclist=[]

s=0

for i in suite:

runner = HTMLTestRunnerHTMLTestRunner(stream=fp,title=str(casedir[s])+u'测试报告',description=u'用例执行情况:')

proc = threadingThread(target=runnerrun,args=(i,))

proclistappend(proc)

s=s+1

for proc in proclist:

procstart()

for proc in proclist:

procjoin()

fpclose()

if __name__ == "__main__":

runtmp=creatsuite()

runcase(runtmp[0],runtmp[1])

Python提供了非常好用的多进程包multiprocessing,你只需要定义一个函数,Python会替你完成其他所有事情。

借助这个包,可以轻松完成从单进程到并发执行的转换。

1、新建单一进程

如果我们新建少量进程,可以如下:

import multiprocessing

import time

def func(msg):

for i in xrange(3):

print msg

timesleep(1)

if __name__ == "__main__":

p = multiprocessingProcess(target=func, args=("hello", ))

pstart()

pjoin()

print "Sub-process done"12345678910111213

2、使用进程池

是的,你没有看错,不是线程池。它可以让你跑满多核CPU,而且使用方法非常简单。

注意要用apply_async,如果落下async,就变成阻塞版本了。

processes=4是最多并发进程数量。

import multiprocessing

import time

def func(msg):

for i in xrange(3):

print msg

timesleep(1)

if __name__ == "__main__":

pool = multiprocessingPool(processes=4)

for i in xrange(10):

msg = "hello %d" %(i)

poolapply_async(func, (msg, ))

poolclose()

pooljoin()

print "Sub-process(es) done"12345678910111213141516

3、使用Pool,并需要关注结果

更多的时候,我们不仅需要多进程执行,还需要关注每个进程的执行结果,如下:

import multiprocessing

import time

def func(msg):

for i in xrange(3):

print msg

timesleep(1)

return "done " + msg

if __name__ == "__main__":

pool = multiprocessingPool(processes=4)

result = []

for i in xrange(10):

msg = "hello %d" %(i)

resultappend(poolapply_async(func, (msg, )))

poolclose()

pooljoin()

for res in result:

print resget()

print "Sub-process(es) done"1234567891011121314151617181920

20141225更新

根据网友评论中的反馈,在Windows下运行有可能崩溃(开启了一大堆新窗口、进程),可以通过如下调用来解决:

multiprocessingfreeze_support()1

附录(自己的脚本):

#!/usr/bin/python

import threading

import subprocess

import datetime

import multiprocessing

def dd_test(round, th):

test_file_arg = 'of=/zbkc/test_mds_crash/1m_%s_%s_{}' %(round, th)

command = "seq 100 | xargs -i dd if=/dev/zero %s bs=1M count=1" %test_file_arg

print command

subprocesscall(command,shell=True,stdout=open('/dev/null','w'),stderr=subprocessSTDOUT)

def mds_stat(round):

p = subprocessPopen("zbkc mds stat", shell = True, stdout = subprocessPIPE)

out = pstdoutreadlines()

if out[0]find('active') != -1:

command = "echo '0205pm %s round mds status OK, %s' >> /round_record" %(round, datetimedatetimenow())

command_2 = "time (ls /zbkc/test_mds_crash/) 2>>/round_record"

command_3 = "ls /zbkc/test_mds_crash | wc -l >> /round_record"

subprocesscall(command,shell=True)

subprocesscall(command_2,shell=True)

subprocesscall(command_3,shell=True)

return 1

else:

command = "echo '0205 %s round mds status abnormal, %s, %s' >> /round_record" %(round, out[0], datetimedatetimenow())

subprocesscall(command,shell=True)

return 0

#threads = []

for round in range(1, 1600):

pool = multiprocessingPool(processes = 10) #使用进程池

for th in range(10):

# th_name = "thread-" + str(th)

# threadsappend(th_name) #添加线程到线程列表

# threadingThread(target = dd_test, args = (round, th), name = th_name)start() #创建多线程任务

poolapply_async(dd_test, (round, th))

poolclose()

pooljoin()

#等待线程完成

# for t in threads:

# tjoin()

if mds_stat(round) == 0:

subprocesscall("zbkc -s",shell=True)

break

并发和并行

你吃饭吃到一半,电话来了,你一直到吃完了以后才去接,这就说明你不支持并发也不支持并行。

你吃饭吃到一半,电话来了,你停了下来接了电话,接完后继续吃饭,这说明你支持并发。

你吃饭吃到一半,电话来了,你一边打电话一边吃饭,这说明你支持并行。

并发的关键是你有处理多个任务的能力,不一定要同时。

并行的关键是你有同时处理多个任务的能力。

所以我认为它们最关键的点就是:是否是『同时』。

Python 中没有真正的并行,只有并发

无论你的机器有多少个CPU, 同一时间只有一个Python解析器执行。这也和大部分解释型语言一致, 都不支持并行。这应该是python设计的先天缺陷。

javascript也是相同的道理, javascript早起的版本只支持单任务,后来通过worker来支持并发。

Python中的多线程

先复习一下进程和线程的概念

所谓进程,简单的说就是一段程序的动态执行过程,是系统进行资源分配和调度的一个基本单位。一个进程中又可以包含若干个独立的执行流,我们将这些执行流称为线程,线程是CPU调度和分配的基本单位。同一个进程的线程都有自己的专有寄存器,但内存等资源是共享的。

这里有一个更加形象的解释, 出自阮一峰大神的杰作:

>

某个时间段内,数据涌来,这就是并发。如果数据量很大,就是高并发

高并发的解决方法:

1、队列、缓冲区

假设只有一个窗口,陆续涌入食堂的人,排队打菜是比较好的方式

所以,排队(队列)是一种天然解决并发的办法

排队就是把人排成 队列,先进先出,解决了资源使用的问题

排成的队列,其实就是一个缓冲地带,就是 缓冲区

假设女生优先,每次都从这个队伍中优先选出女生出来先打饭,这就是 优先队列

例如queue模块的类Queue、LifoQueue、PriorityQueue(小顶堆实现)

2、争抢

只开一个窗口,有可能没有秩序,也就是谁挤进去就给谁打饭

挤到窗口的人占据窗口,直到打到饭菜离开

其他人继续争抢,会有一个人占据着窗口,可以视为锁定窗口,窗口就不能为其他人提供服务了。

这是一种锁机制

谁抢到资源就上锁,排他性的锁,其他人只能等候

争抢也是一种高并发解决方案,但是,这样可能不好,因为有可能有人很长时间抢不到

3、预处理

如果排长队的原因,是由于每个人打菜等候时间长,因为要吃的菜没有,需要现做,没打着饭不走开,锁定着窗口

食堂可以提前统计大多数人最爱吃的菜品,将最爱吃的80%的热门菜,提前做好,保证供应,20%的冷门菜,现做

这样大多数人,就算锁定窗口,也很快打到饭菜走了,快速释放窗口

一种提前加载用户需要的数据的思路,预处理 思想,缓存常用

更多Python知识,请关注:Python自学网!!

第一个就是并发本身所带来的开销即新开处理线程、关闭处理线程、多个处理线程时间片轮转所带来的开销。

实际上对于一些逻辑不那么复杂的场景来说这些开销甚至比真正的处理逻辑部分代码的开销更大。所以我们决定采用基于协程的并发方式,即服务进程只有一个(单cpu)所有的请求数据都由这个服务进程内部来维护,同时服务进程自行调度不同请求的处理顺序,这样避免了传统多线程并发方式新建、销毁以及系统调度处理线程的开销。基于这样的考虑我们选择了基于Tornado框架实现api服务的开发。Tornado的实现非常简洁明了,使用python的生成器作为协程,利用IOLoop实现了调度队列。

第二个问题是数据库的性能,这里说的数据库包括MongoDB和Redis,我这里分开讲。

先讲MongoDB的问题,MongoDB主要存储不同的用户对于验证的不同设置,比如该显示什么样的。

一开始每次验证请求都会查询MongoDB,当时我们的MongoDB是纯内存的,同时三台机器组成一个复制集,这样的组合大概能稳定承载八九千的qps,后来随着我们验证量越来越大,这个承载能力逐渐就成为了我们的瓶颈。

为了彻底搞定这个问题,我们提出了最极端的解决方案,干脆直接把数据库中的数据完全缓存到服务进程里定期批量更新,这样查询的开销将大大降低。但是因为我们用的是Python,由于GIL的存在,在8核服务器上会fork出来8个服务进程,进程之间不像线程那么方便,所以我们基于mmap自己写了一套伙伴算法构建了一个跨进程共享缓存。自从这套缓存上线之后,Mongodb的负载几乎变成了零。

说完了MongoDB再说Redis的问题,Redis代码简洁、数据结构丰富、性能强大,唯一的问题是作为一个单进程程序,终究性能是有上限的。

虽然今年Redis发布了官方的集群版本,但是经过我们的测试,认为这套分布式方案的故障恢复时间不够优秀并且运维成本较高。在Redis官方集群方案面世之前,开源世界有不少proxy方案,比如Twtter的TwemProxy和豌豆荚的Codis。这两种方案测试完之后给我们的感觉TwemProxy运维还是比较麻烦,Codis使用起来让人非常心旷神怡,无论是修改配置还是扩容都可以在配置页面上完成,并且性能也还算不错,但无奈当时Codis还有比较严重的BUG只能放弃之。

几乎尝试过各种方案之后,我们还是下决心自己实现一套分布式方案,目的是高度贴合我们的需求并且运维成本要低、扩容要方便、故障切换要快最重要的是数据冗余一定要做好。

基于上面的考虑,我们确定基于客户端的分布式方案,通过zookeeper来同步状态保证高可用。具体来说,我们修改Redis源码,使其向zookeeper注册,客户端由zookeeper上获取Redis服务器集群信息并根据统一的一致性哈希算法来计算数据应该存储在哪台Redis上,并在哈希环的下一台Redis上写入一份冗余数据,当读取原始数据失败时可以立即尝试读取冗余数据而不会造成服务中断。

以上就是关于python多线程并行计算通过向线程池ThreadPoolExecutor提交任务的实现方法全部的内容,包括:python多线程并行计算通过向线程池ThreadPoolExecutor提交任务的实现方法、python用例并发怎么解决、python 多进程是真的并发吗等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/zz/9796820.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-05-02
下一篇2023-05-02

发表评论

登录后才能评论

评论列表(0条)

    保存