Utilisation du module 'subprocess' avec timeout
voici le code Python pour exécuter une commande arbitraire retournant ses données stdout
, ou soulever une exception sur les codes de sortie non-zéro:
proc = subprocess.Popen(
cmd,
stderr=subprocess.STDOUT, # Merge stdout and stderr
stdout=subprocess.PIPE,
shell=True)
communicate
est utilisé pour attendre que le processus de sortie de:
stdoutdata, stderrdata = proc.communicate()
le module subprocess
ne supporte pas le timeout--capacité de tuer un processus qui tourne pendant plus de X secondes--par conséquent, communicate
peut prendre une éternité à courir.
Qu'est-ce que le le plus simple façon d'implémenter les timeouts dans un programme Python destiné à fonctionner sous Windows et Linux?
27 réponses
En Python 3.3+:
from subprocess import STDOUT, check_output
output = check_output(cmd, stderr=STDOUT, timeout=seconds)
output
est une chaîne de caractères qui contient les données stdout, stderr fusionnées de la commande.
ce code élève CalledProcessError
sur le statut de sortie non-zéro comme spécifié dans le texte de la question contrairement à la méthode proc.communicate()
.
j'ai supprimé shell=True
parce qu'il est souvent utilisé inutilement. Vous pouvez toujours l'ajouter si cmd
l'exige. Si vous ajoutez shell=True
c'est à dire, si le processus de l'enfant produit ses propres descendants; check_output()
peut revenir beaucoup plus tard que le timeout indique, voir subprocess timeout failure .
la fonctionnalité timeout est disponible sur Python 2.x via le subprocess32
backport du module 3.2+ subprocess.
je ne sais pas beaucoup sur les détails de bas niveau; mais, étant donné que, dans python 2.6 L'API offre la possibilité d'attendre les threads et terminer les processus, qu'en est-il de l'exécution du processus dans un séparé thread?
import subprocess, threading
class Command(object):
def __init__(self, cmd):
self.cmd = cmd
self.process = None
def run(self, timeout):
def target():
print 'Thread started'
self.process = subprocess.Popen(self.cmd, shell=True)
self.process.communicate()
print 'Thread finished'
thread = threading.Thread(target=target)
thread.start()
thread.join(timeout)
if thread.is_alive():
print 'Terminating process'
self.process.terminate()
thread.join()
print self.process.returncode
command = Command("echo 'Process started'; sleep 2; echo 'Process finished'")
command.run(timeout=3)
command.run(timeout=1)
La sortie de cet extrait dans ma machine est:
Thread started
Process started
Process finished
Thread finished
0
Thread started
Process started
Terminating process
Thread finished
-15
où il peut être vu que, dans la première exécution, le processus de fini correctement (code de retour 0), tandis que dans le second, la processus a été interrompu (retourner le code -15).
Je n'ai pas testé dans windows; mais, mis à part la mise à jour de l'exemple command, je pense que ça devrait marcher puisque je n'ai pas trouvé documentation Tout ce qui indique ce fil.rejoindre ou de processus.résilier n'est pas pris en charge.
la réponse de jcollado peut être simplifiée en utilisant le filetage .Minuterie classe:
import shlex
from subprocess import Popen, PIPE
from threading import Timer
def run(cmd, timeout_sec):
proc = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE)
timer = Timer(timeout_sec, proc.kill)
try:
timer.start()
stdout, stderr = proc.communicate()
finally:
timer.cancel()
# Examples: both take 1 second
run("sleep 1", 5) # process ends normally at 1 second
run("sleep 5", 1) # timeout happens at 1 second
si vous êtes sous Unix,
import signal
...
class Alarm(Exception):
pass
def alarm_handler(signum, frame):
raise Alarm
signal.signal(signal.SIGALRM, alarm_handler)
signal.alarm(5*60) # 5 minutes
try:
stdoutdata, stderrdata = proc.communicate()
signal.alarm(0) # reset the alarm
except Alarm:
print "Oops, taking too long!"
# whatever else
Voici la solution D'Alex Martelli en tant que module avec un procédé de destruction appropriée. Les autres approches ne fonctionnent pas parce qu'elles n'utilisent pas proc.communiquer.)( Donc, si vous avez un processus qui produit beaucoup de sortie, il remplira son tampon de sortie et bloquer jusqu'à ce que vous lisez quelque chose d'elle.
from os import kill
from signal import alarm, signal, SIGALRM, SIGKILL
from subprocess import PIPE, Popen
def run(args, cwd = None, shell = False, kill_tree = True, timeout = -1, env = None):
'''
Run a command with a timeout after which it will be forcibly
killed.
'''
class Alarm(Exception):
pass
def alarm_handler(signum, frame):
raise Alarm
p = Popen(args, shell = shell, cwd = cwd, stdout = PIPE, stderr = PIPE, env = env)
if timeout != -1:
signal(SIGALRM, alarm_handler)
alarm(timeout)
try:
stdout, stderr = p.communicate()
if timeout != -1:
alarm(0)
except Alarm:
pids = [p.pid]
if kill_tree:
pids.extend(get_process_children(p.pid))
for pid in pids:
# process might have died before getting to this line
# so wrap to avoid OSError: no such process
try:
kill(pid, SIGKILL)
except OSError:
pass
return -9, '', ''
return p.returncode, stdout, stderr
def get_process_children(pid):
p = Popen('ps --no-headers -o pid --ppid %d' % pid, shell = True,
stdout = PIPE, stderr = PIPE)
stdout, stderr = p.communicate()
return [int(p) for p in stdout.split()]
if __name__ == '__main__':
print run('find /', shell = True, timeout = 3)
print run('find', shell = True)
j'ai modifié sussudio réponse. Maintenant la fonction retourne: ( returncode
, stdout
, stderr
, timeout
) - stdout
et stderr
est décodé en chaîne utf-8
def kill_proc(proc, timeout):
timeout["value"] = True
proc.kill()
def run(cmd, timeout_sec):
proc = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
timeout = {"value": False}
timer = Timer(timeout_sec, kill_proc, [proc, timeout])
timer.start()
stdout, stderr = proc.communicate()
timer.cancel()
return proc.returncode, stdout.decode("utf-8"), stderr.decode("utf-8"), timeout["value"]
surpris personne n'a mentionné l'utilisation de timeout
timeout 5 ping -c 3 somehost
ce ne sera pas pour le travail pour chaque cas d'utilisation évidemment, mais si vous avez affaire à un script simple, c'est difficile à battre.
également disponible en gtimeout dans coreutils via homebrew
pour les utilisateurs mac.
une autre option consiste à écrire dans un fichier temporaire pour empêcher le blocage de stdout au lieu d'avoir besoin de Poller avec communicate(). Cela a fonctionné pour moi là où les autres réponses n'ont pas fonctionné; par exemple sur windows.
outFile = tempfile.SpooledTemporaryFile()
errFile = tempfile.SpooledTemporaryFile()
proc = subprocess.Popen(args, stderr=errFile, stdout=outFile, universal_newlines=False)
wait_remaining_sec = timeout
while proc.poll() is None and wait_remaining_sec > 0:
time.sleep(1)
wait_remaining_sec -= 1
if wait_remaining_sec <= 0:
killProc(proc.pid)
raise ProcessIncompleteError(proc, timeout)
# read temp streams from start
outFile.seek(0);
errFile.seek(0);
out = outFile.read()
err = errFile.read()
outFile.close()
errFile.close()
timeout
est maintenant supporté par call()
et communicate()
dans le module de sous-processus (à partir de Python3.3):
import subprocess
subprocess.call("command", timeout=20, shell=True)
cela appellera la commande et soulèvera l'exception
subprocess.TimeoutExpired
si la commande ne se termine pas après 20 secondes.
vous pouvez alors gérer l'exception pour continuer votre code, quelque chose comme:
try:
subprocess.call("command", timeout=20, shell=True)
except subprocess.TimeoutExpired:
# insert code here
Espérons que cette aide.
voici ma solution, j'ai utilisé le fil et L'événement:
import subprocess
from threading import Thread, Event
def kill_on_timeout(done, timeout, proc):
if not done.wait(timeout):
proc.kill()
def exec_command(command, timeout):
done = Event()
proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
watcher = Thread(target=kill_on_timeout, args=(done, timeout, proc))
watcher.daemon = True
watcher.start()
data, stderr = proc.communicate()
done.set()
return data, stderr, proc.returncode
en action:
In [2]: exec_command(['sleep', '10'], 5)
Out[2]: ('', '', -9)
In [3]: exec_command(['sleep', '10'], 11)
Out[3]: ('', '', 0)
la solution que j'utilise est de préfixer la commande shell par timelimit . Si la commande prend trop de temps, timelimit l'arrêtera et Popen aura un code de retour défini par timelimit. Si c'est > 128, cela signifie délai tué le processus.
Voir aussi python sous-processus avec délai d'attente et de grande puissance (>64 KO)
j'ai ajouté la solution avec filetage de jcollado
à mon module Python easyprocess .
Installation:
pip install easyprocess
exemple:
from easyprocess import Proc
# shell is not supported!
stdout=Proc('ping localhost').call(timeout=1.5).stdout
print stdout
si vous utilisez python 2, Essayez
import subprocess32
try:
output = subprocess32.check_output(command, shell=True, timeout=3)
except subprocess32.TimeoutExpired as e:
print e
j'ai mis en œuvre ce que j'ai pu en tirer. Cela fonctionne sous Windows, et puisqu'il s'agit d'un wiki communautaire, je me suis dit que je partagerais aussi mon code:
class Command(threading.Thread):
def __init__(self, cmd, outFile, errFile, timeout):
threading.Thread.__init__(self)
self.cmd = cmd
self.process = None
self.outFile = outFile
self.errFile = errFile
self.timed_out = False
self.timeout = timeout
def run(self):
self.process = subprocess.Popen(self.cmd, stdout = self.outFile, \
stderr = self.errFile)
while (self.process.poll() is None and self.timeout > 0):
time.sleep(1)
self.timeout -= 1
if not self.timeout > 0:
self.process.terminate()
self.timed_out = True
else:
self.timed_out = False
puis d'une autre classe ou fichier:
outFile = tempfile.SpooledTemporaryFile()
errFile = tempfile.SpooledTemporaryFile()
executor = command.Command(c, outFile, errFile, timeout)
executor.daemon = True
executor.start()
executor.join()
if executor.timed_out:
out = 'timed out'
else:
outFile.seek(0)
errFile.seek(0)
out = outFile.read()
err = errFile.read()
outFile.close()
errFile.close()
une fois que vous avez compris le fonctionnement complet des machines dans *unix, vous trouverez facilement la solution plus simple:
considérons cet exemple simple comment faire communiquer timeoutable() meth en utilisant select.select () (disponible presque partout sur *nix de nos jours). Cela peut aussi être écrit avec epoll/poll/kqueue, mais select.sélectionnez() variante pourrait être un bon exemple pour vous. Et les principales limites de sélectionner.select () (speed et 1024 max fds) ne sont pas applicables à votre tâche.
cela fonctionne sous *nix, ne crée pas de threads, n'utilise pas de signaux, peut être lauched à partir de n'importe quel thread (pas seulement principal), et rapide enought pour lire 250mb/s de données de stdout sur ma machine (i5 2,3 ghz).
il y a un problème à joindre stdout/stderr à la fin de communiquer. Si vous avez une sortie de programme énorme cela pourrait conduire à une grande utilisation de la mémoire. Mais vous pouvez appeler communicate() plusieurs fois avec des timeouts plus petits.
class Popen(subprocess.Popen):
def communicate(self, input=None, timeout=None):
if timeout is None:
return subprocess.Popen.communicate(self, input)
if self.stdin:
# Flush stdio buffer, this might block if user
# has been writing to .stdin in an uncontrolled
# fashion.
self.stdin.flush()
if not input:
self.stdin.close()
read_set, write_set = [], []
stdout = stderr = None
if self.stdin and input:
write_set.append(self.stdin)
if self.stdout:
read_set.append(self.stdout)
stdout = []
if self.stderr:
read_set.append(self.stderr)
stderr = []
input_offset = 0
deadline = time.time() + timeout
while read_set or write_set:
try:
rlist, wlist, xlist = select.select(read_set, write_set, [], max(0, deadline - time.time()))
except select.error as ex:
if ex.args[0] == errno.EINTR:
continue
raise
if not (rlist or wlist):
# Just break if timeout
# Since we do not close stdout/stderr/stdin, we can call
# communicate() several times reading data by smaller pieces.
break
if self.stdin in wlist:
chunk = input[input_offset:input_offset + subprocess._PIPE_BUF]
try:
bytes_written = os.write(self.stdin.fileno(), chunk)
except OSError as ex:
if ex.errno == errno.EPIPE:
self.stdin.close()
write_set.remove(self.stdin)
else:
raise
else:
input_offset += bytes_written
if input_offset >= len(input):
self.stdin.close()
write_set.remove(self.stdin)
# Read stdout / stderr by 1024 bytes
for fn, tgt in (
(self.stdout, stdout),
(self.stderr, stderr),
):
if fn in rlist:
data = os.read(fn.fileno(), 1024)
if data == '':
fn.close()
read_set.remove(fn)
tgt.append(data)
if stdout is not None:
stdout = ''.join(stdout)
if stderr is not None:
stderr = ''.join(stderr)
return (stdout, stderr)
vous pouvez le faire en utilisant select
import subprocess
from datetime import datetime
from select import select
def call_with_timeout(cmd, timeout):
started = datetime.now()
sp = subprocess.Popen(cmd, stdout=subprocess.PIPE)
while True:
p = select([sp.stdout], [], [], timeout)
if p[0]:
p[0][0].read()
ret = sp.poll()
if ret is not None:
return ret
if (datetime.now()-started).total_seconds() > timeout:
sp.kill()
return None
Je ne sais pas pourquoi il n'est pas mentionné mais depuis Python 3.5, il y a un nouveau subprocess.run
commande universelle (qui est destiné à remplacer check_call
, check_output
...) et qui a aussi le paramètre timeout
.
sous-processus.exécuter(args, *, stdin=None, entrée=None, stdout=None, stderr=None, shell=False, la mdc=None, timeout=None, vérifiez=False, encoding=None, des erreurs=None)
Run the command described by args. Wait for command to complete, then return a CompletedProcess instance.
Il soulève un subprocess.TimeoutExpired
exception lorsque le délai est expiré.
j'ai utilisé killableprocess avec succès sur Windows, Linux et Mac. Si vous utilisez Cygwin Python, vous aurez besoin de la version D'OSAF de killableprocess car sinon les processus Windows natifs ne seront pas tués.
bien que je ne l'ai pas beaucoup regardé, ce décorateur que j'ai trouvé à ActiveState semble être très utile pour ce genre de chose. Avec subprocess.Popen(..., close_fds=True)
, au moins je suis prêt pour les scripts shell en Python.
il y a une idée de sous-classe la classe Popen et de l'étendre avec quelques décorateurs méthode simple. Appelons-le "ExpirablePopen".
from logging import error
from subprocess import Popen
from threading import Event
from threading import Thread
class ExpirablePopen(Popen):
def __init__(self, *args, **kwargs):
self.timeout = kwargs.pop('timeout', 0)
self.timer = None
self.done = Event()
Popen.__init__(self, *args, **kwargs)
def __tkill(self):
timeout = self.timeout
if not self.done.wait(timeout):
error('Terminating process {} by timeout of {} secs.'.format(self.pid, timeout))
self.kill()
def expirable(func):
def wrapper(self, *args, **kwargs):
# zero timeout means call of parent method
if self.timeout == 0:
return func(self, *args, **kwargs)
# if timer is None, need to start it
if self.timer is None:
self.timer = thr = Thread(target=self.__tkill)
thr.daemon = True
thr.start()
result = func(self, *args, **kwargs)
self.done.set()
return result
return wrapper
wait = expirable(Popen.wait)
communicate = expirable(Popen.communicate)
if __name__ == '__main__':
from subprocess import PIPE
print ExpirablePopen('ssh -T git@bitbucket.org', stdout=PIPE, timeout=1).communicate()
j'ai eu le problème que je voulais terminer un sous-processus de multithreading si cela prenait plus de temps qu'une longueur de délai donnée. Je voulais mettre un temps mort dans Popen()
, mais ça n'a pas marché. Puis, j'ai réalisé que Popen().wait()
est égal à call()
et j'ai donc eu l'idée de fixer un délai dans le cadre de la méthode .wait(timeout=xxx)
, qui a finalement fonctionné. Ainsi, je l'ai résolu de cette façon:
import os
import sys
import signal
import subprocess
from multiprocessing import Pool
cores_for_parallelization = 4
timeout_time = 15 # seconds
def main():
jobs = [...YOUR_JOB_LIST...]
with Pool(cores_for_parallelization) as p:
p.map(run_parallel_jobs, jobs)
def run_parallel_jobs(args):
# Define the arguments including the paths
initial_terminal_command = 'C:\Python34\python.exe' # Python executable
function_to_start = 'C:\temp\xyz.py' # The multithreading script
final_list = [initial_terminal_command, function_to_start]
final_list.extend(args)
# Start the subprocess and determine the process PID
subp = subprocess.Popen(final_list) # starts the process
pid = subp.pid
# Wait until the return code returns from the function by considering the timeout.
# If not, terminate the process.
try:
returncode = subp.wait(timeout=timeout_time) # should be zero if accomplished
except subprocess.TimeoutExpired:
# Distinguish between Linux and Windows and terminate the process if
# the timeout has been expired
if sys.platform == 'linux2':
os.kill(pid, signal.SIGTERM)
elif sys.platform == 'win32':
subp.terminate()
if __name__ == '__main__':
main()
préparer la commande Linux timeout
n'est pas une mauvaise solution et ça a marché pour moi.
cmd = "timeout 20 "+ cmd
subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(output, err) = p.communicate()
malheureusement, je suis lié par des politiques très strictes sur la divulgation du code source par mon employeur, donc je ne peux pas fournir de code réel. Mais à mon goût, la meilleure solution est de créer une sous-classe supérieure Popen.wait()
à Poller au lieu d'attendre indéfiniment, et Popen.__init__
à accepter un paramètre de temporisation. Une fois que vous avez fait cela, toutes les autres méthodes Popen
(qui appellent wait
) fonctionneront comme prévu, y compris communicate
.
https://pypi.python.org/pypi/python-subprocess2 fournit des extensions au module de sous-processus qui vous permettent d'attendre jusqu'à une certaine période de temps, sinon prendre fin.
donc, attendre jusqu'à 10 secondes pour que le processus se termine, sinon tuer:
pipe = subprocess.Popen('...')
timeout = 10
results = pipe.waitOrTerminate(timeout)
compatible avec windows et unix. les "résultats" est un dictionnaire, il contient des "code_retour" qui est le retour de l'application (ou Aucun si il a dû être tué), ainsi que"actionprise". qui sera "SUBPROCESS2_PROCESS_COMPLETED" si le processus est terminé normalement, ou un masque de "SUBPROCESS2_PROCESS_TERMINATED" et SUBPROCESS2_PROCESS_CELLED en fonction des mesures prises (voir la documentation pour plus de détails)
cette solution tue l'arbre de processus en cas de shell=True, passe les paramètres au processus (ou non), a un timeout et obtient la sortie stdout, stderr et process du call back (elle utilise psutil pour le kill_proc_tree). Cela était basé sur plusieurs solutions publiées dans SO, y compris jcollado. Réponse aux commentaires D'Anson et de jradice dans la réponse de jcollado. Testé dans Windows Srvr 2012 et Ubuntu 14.04. Veuillez noter que pour Ubuntu, vous devez modifier le parent.enfant.(..) appel du parent.get_children(...).
def kill_proc_tree(pid, including_parent=True):
parent = psutil.Process(pid)
children = parent.children(recursive=True)
for child in children:
child.kill()
psutil.wait_procs(children, timeout=5)
if including_parent:
parent.kill()
parent.wait(5)
def run_with_timeout(cmd, current_dir, cmd_parms, timeout):
def target():
process = subprocess.Popen(cmd, cwd=current_dir, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
# wait for the process to terminate
if (cmd_parms == ""):
out, err = process.communicate()
else:
out, err = process.communicate(cmd_parms)
errcode = process.returncode
thread = Thread(target=target)
thread.start()
thread.join(timeout)
if thread.is_alive():
me = os.getpid()
kill_proc_tree(me, including_parent=False)
thread.join()
import subprocess, optparse, os, sys, re, datetime, threading, time, glob, shutil, xml.dom.minidom, traceback
class OutputManager:
def __init__(self, filename, mode, console, logonly):
self.con = console
self.logtoconsole = True
self.logtofile = False
if filename:
try:
self.f = open(filename, mode)
self.logtofile = True
if logonly == True:
self.logtoconsole = False
except IOError:
print (sys.exc_value)
print ("Switching to console only output...\n")
self.logtofile = False
self.logtoconsole = True
def write(self, data):
if self.logtoconsole == True:
self.con.write(data)
if self.logtofile == True:
self.f.write(data)
sys.stdout.flush()
def getTimeString():
return time.strftime("%Y-%m-%d", time.gmtime())
def runCommand(command):
'''
Execute a command in new thread and return the
stdout and stderr content of it.
'''
try:
Output = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True).communicate()[0]
except Exception as e:
print ("runCommand failed :%s" % (command))
print (str(e))
sys.stdout.flush()
return None
return Output
def GetOs():
Os = ""
if sys.platform.startswith('win32'):
Os = "win"
elif sys.platform.startswith('linux'):
Os = "linux"
elif sys.platform.startswith('darwin'):
Os = "mac"
return Os
def check_output(*popenargs, **kwargs):
try:
if 'stdout' in kwargs:
raise ValueError('stdout argument not allowed, it will be overridden.')
# Get start time.
startTime = datetime.datetime.now()
timeoutValue=3600
cmd = popenargs[0]
if sys.platform.startswith('win32'):
process = subprocess.Popen( cmd, stdout=subprocess.PIPE, shell=True)
elif sys.platform.startswith('linux'):
process = subprocess.Popen( cmd , stdout=subprocess.PIPE, shell=True )
elif sys.platform.startswith('darwin'):
process = subprocess.Popen( cmd , stdout=subprocess.PIPE, shell=True )
stdoutdata, stderrdata = process.communicate( timeout = timeoutValue )
retcode = process.poll()
####################################
# Catch crash error and log it.
####################################
OutputHandle = None
try:
if retcode >= 1:
OutputHandle = OutputManager( 'CrashJob_' + getTimeString() + '.txt', 'a+', sys.stdout, False)
OutputHandle.write( cmd )
print (stdoutdata)
print (stderrdata)
sys.stdout.flush()
except Exception as e:
print (str(e))
except subprocess.TimeoutExpired:
####################################
# Catch time out error and log it.
####################################
Os = GetOs()
if Os == 'win':
killCmd = "taskkill /FI \"IMAGENAME eq {0}\" /T /F"
elif Os == 'linux':
killCmd = "pkill {0)"
elif Os == 'mac':
# Linux, Mac OS
killCmd = "killall -KILL {0}"
runCommand(killCmd.format("java"))
runCommand(killCmd.format("YouApp"))
OutputHandle = None
try:
OutputHandle = OutputManager( 'KillJob_' + getTimeString() + '.txt', 'a+', sys.stdout, False)
OutputHandle.write( cmd )
except Exception as e:
print (str(e))
except Exception as e:
for frame in traceback.extract_tb(sys.exc_info()[2]):
fname,lineno,fn,text = frame
print "Error in %s on line %d" % (fname, lineno)
essayait juste d'écrire quelque chose de plus simple.
#!/usr/bin/python
from subprocess import Popen, PIPE
import datetime
import time
popen = Popen(["/bin/sleep", "10"]);
pid = popen.pid
sttime = time.time();
waittime = 3
print "Start time %s"%(sttime)
while True:
popen.poll();
time.sleep(1)
rcode = popen.returncode
now = time.time();
if [ rcode is None ] and [ now > (sttime + waittime) ] :
print "Killing it now"
popen.kill()