
#!/usr/bin/env python"""Start process; wait 2 seconds; kill the process; print all process output."""import subprocessimport tempfileimport timedef main(): # open temporary file (it automatically deleted when it is closed) # `Popen` requires `f.fileno()` so `SpooledTemporaryFile` adds nothing here f = tempfile.TemporaryFile() # start process, redirect stdout p = subprocess.Popen(["top"], stdout=f) # wait 2 seconds time.sleep(2) # kill process #NOTE: if it doesn't kill the process then `p.wait()` blocks forever p.terminate() p.wait() # wait for the process to terminate otherwise the output is garbled # print saved output f.seek(0) # rewind to the beginning of the file print f.read(), f.close()if __name__=="__main__": main()
类似于尾巴的解决方案,仅打印输出的一部分
你可以在另一个线程中读取过程输出,并将所需数量的最后几行保存在队列中:
import collectionsimport subprocessimport timeimport threadingdef read_output(process, append): for line in iter(process.stdout.readline, ""): append(line)def main(): # start process, redirect stdout process = subprocess.Popen(["top"], stdout=subprocess.PIPE, close_fds=True) try: # save last `number_of_lines` lines of the process output number_of_lines = 200 q = collections.deque(maxlen=number_of_lines) # atomic .append() t = threading.Thread(target=read_output, args=(process, q.append)) t.daemon = True t.start() # time.sleep(2) finally: process.terminate() #NOTE: it doesn't ensure the process termination # print saved lines print ''.join(q)if __name__=="__main__": main()
此变体必须
q.append()是原子 *** 作。否则,输出可能会损坏。
signal.alarm()解
你可以用来 在指定的超时后
signal.alarm()调用,
process.terminate()而不用读入另一个线程。尽管它可能无法与
subprocess模块很好地交互。基于
@Alex Martelli的答案:
import collectionsimport signalimport subprocessclass Alarm(Exception): passdef alarm_handler(signum, frame): raise Alarmdef main(): # start process, redirect stdout process = subprocess.Popen(["top"], stdout=subprocess.PIPE, close_fds=True) # set signal handler signal.signal(signal.SIGALRM, alarm_handler) signal.alarm(2) # produce SIGALRM in 2 seconds try: # save last `number_of_lines` lines of the process output number_of_lines = 200 q = collections.deque(maxlen=number_of_lines) for line in iter(process.stdout.readline, ""): q.append(line) signal.alarm(0) # cancel alarm except Alarm: process.terminate() finally: # print saved lines print ''.join(q)if __name__=="__main__": main()
该方法仅适用于
* nix系统。如果
process.stdout.readline()不返回,它可能会阻塞。
threading.Timer 解import collectionsimport subprocessimport threadingdef main(): # start process, redirect stdout process = subprocess.Popen(["top"], stdout=subprocess.PIPE, close_fds=True) # terminate process in timeout seconds timeout = 2 # seconds timer = threading.Timer(timeout, process.terminate) timer.start() # save last `number_of_lines` lines of the process output number_of_lines = 200 q = collections.deque(process.stdout, maxlen=number_of_lines) timer.cancel() # print saved lines print ''.join(q),if __name__=="__main__": main()
这种方法在Windows上也应适用。在这里,我用作
process.stdout迭代。它可能会引入额外的输出缓冲,
iter(process.stdout.readline, “”)如果不希望的话,你可以切换到该方法。如果进程没有终止,
process.terminate()则脚本挂起。
没有线程,没有信号解决方案
import collectionsimport subprocessimport sysimport timedef main(): args = sys.argv[1:] if not args: args = ['top'] # start process, redirect stdout process = subprocess.Popen(args, stdout=subprocess.PIPE, close_fds=True) # save last `number_of_lines` lines of the process output number_of_lines = 200 q = collections.deque(maxlen=number_of_lines) timeout = 2 # seconds now = start = time.time() while (now - start) < timeout: line = process.stdout.readline() if not line: break q.append(line) now = time.time() else: # on timeout process.terminate() # print saved lines print ''.join(q),if __name__=="__main__": main()
此变型既不使用线程,也不使用信号,但会在终端中产生乱码输出。如果
process.stdout.readline()阻塞,它将阻塞。
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)