python多进程实现MapReduce模型下的文档库词频统计功能

python多进程实现MapReduce模型下的文档库词频统计功能,第1张

time1是读入文档的时间,time2是形成完整字典的时间,time3是加入读入文件后的总时间。

通过观察线程数目和运行时间的关系,可以发现线程数少的时候运行时间更短。

按照Nvidia官方教程按照部署TensorRT成功后, 在python环境下多进程启动tensorrt实例时,系统报错:

解决步骤:

参考TensorRTEngine init () 中的selfcfx = cudaDevice(0)make_context(), 同时别忘了在实例释放时detach cuda上下文

参考TensorRTEngineinference()中的selfcfxpush() 与 selfcfxpop() *** 作

参考文章:

>

当我们有一个很长很长的任务队列(mission_list)和阈值对应的一个处理函数(missionFunction)时,我们一般采用如下的方式进行处理:

但是,如果这任务列表很长很长,处理函数很复杂(占用cpu)时,单核往往需要很长的时间进行处理,此时,Multiprocess便可以极大的提高我们程序的运行速度,相关内容请借鉴 multiprocessing --- 基于进程的并行 — Python 3104 文档。

以上这种场景下,推荐大家采用最简单的进程池+map的方法进行处理,标准的写法, chunksize要借鉴官方的说法,最好大一点

但是!!!! 如果我们的任务列表非常的长,这会导致多进程还没跑起来之前,内存已经撑爆了,任务自然没法完成,此时我们有几种办法进行优化:

进程的启动方法有三种,可参考官方文档:

[上传失败(image-48cd3c-1650511153989)]

在linux环境下,使用forkserver可以节省很多的内存空间, 因为进程启动的是一个服务,不会把主进程的数据全部复制

采用imap会极大的节省空间,它返回的是一个迭代器,也就是结果列表:

但注意,以上写法中,你写的结果迭代部分必须写在with下面。或者采用另一种写法:

还有最后一种,当你的mission list实在太大了,导致你在生成 mission list的时候已经把内存撑爆了,这个时候就得优化 mission_list了,如果你的mission_list是通过一个for循环生成的,你可以使用yield字段,将其封装为一个迭代器,传入进程池:

这样子,我们就封装好了mission_list,它是一个可迭代对象,在取数据的时候才会将数据拉到内存

我在项目中结合了后两种方法,原本256G的内存都不够用,但在修改后内存只占用了不到10G。希望能够帮助到你

python里推荐用多进程而不是多线程,但是多进程也有其自己的限制:相比线程更加笨重、切换耗时更长,并且在python的多进程下,进程数量不推荐超过CPU核心数(一个进程只有一个GIL,所以一个进程只能跑满一个CPU),因为一个进程占用一个CPU时能充分利用机器的性能,但是进程多了就会出现频繁的进程切换,反而得不偿失。

不过特殊情况(特指IO密集型任务)下,多线程是比多进程好用的。

举个例子:给你200W条url,需要你把每个url对应的页面抓取保存起来,这种时候,单单使用多进程,效果肯定是很差的。为什么呢?

例如每次请求的等待时间是2秒,那么如下(忽略cpu计算时间):

1、单进程+单线程:需要2秒200W=400W秒==111111个小时==463天,这个速度明显是不能接受的

2、单进程+多线程:例如我们在这个进程中开了10个多线程,比1中能够提升10倍速度,也就是大约463天能够完成200W条抓取,请注意,这里的实际执行是:线程1遇见了阻塞,CPU切换到线程2去执行,遇见阻塞又切换到线程3等等,10个线程都阻塞后,这个进程就阻塞了,而直到某个线程阻塞完成后,这个进程才能继续执行,所以速度上提升大约能到10倍(这里忽略了线程切换带来的开销,实际上的提升应该是不能达到10倍的),但是需要考虑的是线程的切换也是有开销的,所以不能无限的启动多线程(开200W个线程肯定是不靠谱的)

3、多进程+多线程:这里就厉害了,一般来说也有很多人用这个方法,多进程下,每个进程都能占一个cpu,而多线程从一定程度上绕过了阻塞的等待,所以比单进程下的多线程又更好使了,例如我们开10个进程,每个进程里开20W个线程,执行的速度理论上是比单进程开200W个线程快10倍以上的(为什么是10倍以上而不是10倍,主要是cpu切换200W个线程的消耗肯定比切换20W个进程大得多,考虑到这部分开销,所以是10倍以上)。

还有更好的方法吗?答案是肯定的,它就是:

4、协程,使用它之前我们先讲讲what/why/how(它是什么/为什么用它/怎么使用它)

what:

协程是一种用户级的轻量级线程。协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:

协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

在并发编程中,协程与线程类似,每个协程表示一个执行单元,有自己的本地数据,与其它协程共享全局数据和其它资源。

why:

目前主流语言基本上都选择了多线程作为并发设施,与线程相关的概念是抢占式多任务(Preemptive multitasking),而与协程相关的是协作式多任务。

不管是进程还是线程,每次阻塞、切换都需要陷入系统调用(system call),先让CPU跑 *** 作系统的调度程序,然后再由调度程序决定该跑哪一个进程(线程)。而且由于抢占式调度执行顺序无法确定的特点,使用线程时需要非常小心地处理同步问题,而协程完全不存在这个问题(事件驱动和异步程序也有同样的优点)。

因为协程是用户自己来编写调度逻辑的,对CPU来说,协程其实是单线程,所以CPU不用去考虑怎么调度、切换上下文,这就省去了CPU的切换开销,所以协程在一定程度上又好于多线程。

how:

python里面怎么使用协程?答案是使用gevent,使用方法:看这里

使用协程,可以不受线程开销的限制,我尝试过一次把20W条url放在单进程的协程里执行,完全没问题。

所以最推荐的方法,是多进程+协程(可以看作是每个进程里都是单线程,而这个单线程是协程化的)

多进程+协程下,避开了CPU切换的开销,又能把多个CPU充分利用起来,这种方式对于数据量较大的爬虫还有文件读写之类的效率提升是巨大的。

python的多进程之间无法用全局变量,需要只用队列queen进行通讯。

1 创建。q=multiprocessingQueen(num),num最大存放多少数据

2进程使用队列,需要在创建进程时做为参数传进去。p=multiprocessingProcess(target=fun_name,args=(q,))

3队列使用。队列是先进先出的,pput(任何数据类型),放进数据,当队列满时会进程会堵塞等待。pget()取出数据,当队列中无数据是,进程会堵塞等待。pfull()是否已满,pempty()是否空了。

想要充分利用多核CPU资源,Python中大部分情况下都需要使用多进程,Python中提供了multiprocessing这个包实现多进程。multiprocessing支持子进程、进程间的同步与通信,提供了Process、Queue、Pipe、Lock等组件。

开辟子进程

multiprocessing中提供了Process类来生成进程实例

Process([group [, target [, name [, args [, kwargs]]]]])

group分组,实际上不使用

target表示调用对象,你可以传入方法的名字

args表示给调用对象以元组的形式提供参数,比如target是函数a,他有两个参数m,n,那么该参数为args=(m, n)即可

kwargs表示调用对象的字典

name是别名,相当于给这个进程取一个名字

先来个小例子:

# -- coding:utf-8 --

from multiprocessing import Process, Pool

import os

import time

def run_proc(wTime):

n = 0

while n < 3:

print "subProcess %s run," % osgetpid(), "{0}"format(timectime()) #获取当前进程号和正在运行是的时间

timesleep(wTime) #等待(休眠)

n += 1

if __name__ == "__main__":

p = Process(target=run_proc, args=(2,)) #申请子进程

pstart() #运行进程

print "Parent process run subProcess is ", ppid

print "Parent process end,{0}"format(timectime())

运行结果:

Parent process run subProcess is 30196

Parent process end,Mon Mar 27 11:20:21 2017

subProcess 30196 run, Mon Mar 27 11:20:21 2017

subProcess 30196 run, Mon Mar 27 11:20:23 2017

subProcess 30196 run, Mon Mar 27 11:20:25 2017

根据运行结果可知,父进程运行结束后子进程仍然还在运行,这可能造成僵尸( zombie)进程。

通常情况下,当子进程终结时,它会通知父进程,清空自己所占据的内存,并在内核里留下自己的退出信息。父进程在得知子进程终结时,会从内核中取出子进程的退出信息。但是,如果父进程早于子进程终结,这可能造成子进程的退出信息滞留在内核中,子进程成为僵尸(zombie)进程。当大量僵尸进程积累时,内存空间会被挤占。

有什么办法可以避免僵尸进程呢?

这里介绍进程的一个属性 deamon,当其值为TRUE时,其父进程结束,该进程也直接终止运行(即使还没运行完)。

所以给上面的程序加上pdeamon = true,看看效果。

# -- coding:utf-8 --

from multiprocessing import Process, Pool

import os

import time

def run_proc(wTime):

n = 0

while n < 3:

print "subProcess %s run," % osgetpid(), "{0}"format(timectime())

timesleep(wTime)

n += 1

if __name__ == "__main__":

p = Process(target=run_proc, args=(2,))

pdaemon = True #加入daemon

pstart()

print "Parent process run subProcess is ", ppid

print "Parent process end,{0}"format(timectime())

执行结果:

Parent process run subProcess is 31856

Parent process end,Mon Mar 27 11:40:10 2017

这是问题又来了,子进程并没有执行完,这不是所期望的结果。有没办法将子进程执行完后才让父进程结束呢?

这里引入pjoin()方法,它使子进程执行结束后,父进程才执行之后的代码

# -- coding:utf-8 --

from multiprocessing import Process, Pool

import os

import time

def run_proc(wTime):

n = 0

while n < 3:

print "subProcess %s run," % osgetpid(), "{0}"format(timectime())

timesleep(wTime)

n += 1

if __name__ == "__main__":

p = Process(target=run_proc, args=(2,))

pdaemon = True

pstart()

pjoin() #加入join方法

print "Parent process run subProcess is ", ppid

print "Parent process end,{0}"format(timectime())

执行结果:

subProcess 32076 run, Mon Mar 27 11:46:07 2017

subProcess 32076 run, Mon Mar 27 11:46:09 2017

subProcess 32076 run, Mon Mar 27 11:46:11 2017

Parent process run subProcess is 32076

Parent process end,Mon Mar 27 11:46:13 2017

这样所有的进程就能顺利的执行了。

以上就是关于python多进程实现MapReduce模型下的文档库词频统计功能全部的内容,包括:python多进程实现MapReduce模型下的文档库词频统计功能、Python多进程部署TensorRT的问题及解决、Python 多进程内存占用问题等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/sjk/9324260.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-04-27
下一篇2023-04-27

发表评论

登录后才能评论

评论列表(0条)

    保存