Lecture Non bloquante sur un sous-processus.PIPE en python

j'utilise le module de sous-processus pour démarrer un sous-processus et se connecter à son flux de sortie (stdout). Je veux être capable d'exécuter des lectures non bloquantes sur son stdout. Est-il un moyen de faire .readline non-blocking ou pour vérifier s'il y a des données sur le flux avant que j'invoque .readline ? J'aimerais que ce soit portable ou du moins travailler sous Windows et Linux.

voici comment je le fais pour l'instant (il bloque sur le .readline si aucune donnée n'est disponible):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
431
demandé sur Community 2008-12-17 20:56:34

26 réponses

fcntl , select , asyncproc ça n'aidera pas dans ce cas.

un moyen fiable de lire un flux sans blocage quel que soit le système d'exploitation est d'utiliser Queue.get_nowait() :

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line
353
répondu jfs 2018-08-06 15:02:06

j'ai souvent eu un problème similaire; les programmes Python que j'écris ont souvent besoin d'avoir la capacité d'exécuter certaines fonctionnalités primaires tout en acceptant simultanément les entrées utilisateur de la ligne de commande (stdin). Le simple fait de placer la fonctionnalité de gestion des entrées de l'utilisateur dans un autre thread ne résout pas le problème car readline() bloque et n'a pas de délai d'attente. Si la fonctionnalité principale est complète et qu'il n'y a plus besoin d'attendre la prochaine entrée de l'utilisateur, je veux généralement mon programme pour sortir, mais il ne peut pas parce que readline() bloque toujours dans l'autre thread en attente d'une ligne. Une solution que j'ai trouvée à ce problème est de faire de stdin un fichier non-bloquant en utilisant le module fcntl:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

à mon avis, c'est un peu plus propre que d'utiliser les modules select ou signal pour résoudre ce problème, mais là encore il ne fonctionne que sur UNIX...

70
répondu Jesse 2013-11-20 02:41:51

Python 3.4 introduit une nouvelle API provisoire pour IO asynchrone -- asyncio module .

L'approche est similaire à twisted réponse par @Bryan Ward -- définir un protocole et ses méthodes sont appelées dès que les données sont prêtes:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

Voir "sous-processus" dans les docs .

il y a une interface de haut niveau asyncio.create_subprocess_exec() qui retourne Process objets qui permet de lire une ligne asynchrone en utilisant StreamReader.readline() coroutine (avec async / await Python 3.5 + syntaxe ):

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

readline_and_kill() exécute les tâches suivantes:

  • démarrer sous-processus, de rediriger sa sortie standard (stdout) à un tuyau
  • lire ligne à partir du stdout de subprocess asynchrone
  • tuer sous-processus
  • attendre pour la sortie de

chaque pas peut être limité par des secondes d'arrêt si nécessaire.

36
répondu jfs 2017-05-23 12:10:44

essayez le module asyncproc . Par exemple:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

le module s'occupe de tous les threads comme suggéré par S. Lott.

20
répondu Noah 2011-03-09 19:31:53
17
répondu Bryan Ward 2013-05-14 01:26:17

utiliser select & read(1).

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

Pour readline ():

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a
16
répondu Andy Jackson 2011-01-26 01:02:08

une solution est de faire un autre processus pour effectuer votre lecture du processus, ou faire un fil du processus avec un timeout.

Voici la version filetée d'une fonction de temporisation:

http://code.activestate.com/recipes/473878 /

Cependant, avez-vous besoin de lire le stdout comme ça vient? Une autre solution peut être de dumper la sortie dans un fichier et d'attendre que le processus se termine en utilisant p. attendez () .

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()
8
répondu monkut 2008-12-17 18:16:48

avertissement: cela ne fonctionne que pour tornado

vous pouvez le faire en paramétrant la fd pour qu'elle ne soit pas bloquée, puis en utilisant ioloop pour enregistrer les callbacks. J'ai emballé cela dans un oeuf appelé tornado_subprocess et vous pouvez l'installer via PyPI:

easy_install tornado_subprocess

maintenant, vous pouvez faire quelque chose comme ceci:

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

vous pouvez également l'utiliser avec un RequestHandler

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()
7
répondu Vukasin Toroman 2012-07-06 12:42:26

les solutions existantes ne fonctionnaient pas pour moi (détails ci-dessous). Ce qui a finalement fonctionné a été de mettre en œuvre readline en utilisant read(1) (basé sur cette réponse ). Ce dernier ne bloque pas:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

Pourquoi les solutions existantes ne fonctionne pas:

  1. les Solutions nécessitant une ligne de lecture (y compris les solutions basées sur la file d'attente) doivent toujours être bloquées. Il est difficile (impossible? pour tuer le thread qui exécute readline. Il ne s'tué quand le processus qui l'a créé finit, mais pas quand le processus de production de sortie est tué.
  2. comme l'a fait remarquer anonnn, le fait de mélanger des appels de niveau fcntl de faible intensité avec des appels de niveau readline de haut niveau pourrait ne pas fonctionner correctement.
  3. utilisant select.poll () est soigné, mais ne fonctionne pas sur Windows selon python docs.
  4. utiliser des bibliothèques tierces semble exagéré pour cette tâche et ajoute des dépendances supplémentaires.
6
répondu Vikram Pudi 2017-05-23 12:02:56

cette version de non-blocage lire ne nécessite pas modules spéciaux et va travailler out-of-the-box sur la majorité des distributions Linux.

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())
4
répondu Tom Lime 2016-05-24 15:55:16

j'Ajoute ce problème pour lire un sous-processus.Popen stdout. Voici ma solution de lecture non bloquante:

import fcntl

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()

# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'
3
répondu Sebastien Claeys 2011-04-21 20:51:57

Voici mon code, utilisé pour saisir toutes les sorties du sous-processus DÈS QUE POSSIBLE, y compris les lignes partielles. Il pompe en même temps et stdout et stderr dans l'ordre presque correct.

testé et correctement travaillé sur Python 2.7 linux & windows.

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
    if (len(argv) > 1) and (argv[-1] == "-sub-"):
        import time, sys
        print "Application runned!"
        time.sleep(2)
        print "Slept 2 second"
        time.sleep(1)
        print "Slept 1 additional second",
        time.sleep(2)
        sys.stderr.write("Stderr output after 5 seconds")
        print "Eol on stdin"
        sys.stderr.write("Eol on stderr\n")
        time.sleep(1)
        print "Wow, we have end of work!",
    else:
        os.environ["PYTHONUNBUFFERED"]="1"
        try:
            p = Popen( argv + ["-sub-"],
                       bufsize=0, # line-buffered
                       stdin=PIPE, stdout=PIPE, stderr=PIPE )
        except WindowsError, W:
            if W.winerror==193:
                p = Popen( argv + ["-sub-"],
                           shell=True, # Try to run via shell
                           bufsize=0, # line-buffered
                           stdin=PIPE, stdout=PIPE, stderr=PIPE )
            else:
                raise
        inp = Queue.Queue()
        sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
        serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
        def Pump(stream, category):
            queue = Queue.Queue()
            def rdr():
                while True:
                    buf = stream.read1(8192)
                    if len(buf)>0:
                        queue.put( buf )
                    else:
                        queue.put( None )
                        return
            def clct():
                active = True
                while active:
                    r = queue.get()
                    try:
                        while True:
                            r1 = queue.get(timeout=0.005)
                            if r1 is None:
                                active = False
                                break
                            else:
                                r += r1
                    except Queue.Empty:
                        pass
                    inp.put( (category, r) )
            for tgt in [rdr, clct]:
                th = Thread(target=tgt)
                th.setDaemon(True)
                th.start()
        Pump(sout, 'stdout')
        Pump(serr, 'stderr')

        while p.poll() is None:
            # App still working
            try:
                chan,line = inp.get(timeout = 1.0)
                if chan=='stdout':
                    print "STDOUT>>", line, "<?<"
                elif chan=='stderr':
                    print " ERROR==", line, "=?="
            except Queue.Empty:
                pass
        print "Finish"

if __name__ == '__main__':
    __main__()
3
répondu datacompboy 2013-06-11 19:09:11

ajoutant cette réponse ici car il fournit la possibilité de mettre des pipes non-bloquantes sur Windows et Unix.

tous les ctypes sont des détails grâce à la réponse de @techtonik .

il existe une version légèrement modifiée à utiliser sur les systèmes Unix et Windows.

  • python3 compatible (changement mineur seulement nécessaire) .
  • Comprend posix version, et définit exception de l'utilisation pour.

de cette façon, vous pouvez utiliser la même fonction et exception pour le code Unix et Windows.

# pipe_non_blocking.py (module)
"""
Example use:

    p = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            )

    pipe_non_blocking_set(p.stdout.fileno())

    try:
        data = os.read(p.stdout.fileno(), 1)
    except PortableBlockingIOError as ex:
        if not pipe_non_blocking_is_error_blocking(ex):
            raise ex
"""


__all__ = (
    "pipe_non_blocking_set",
    "pipe_non_blocking_is_error_blocking",
    "PortableBlockingIOError",
    )

import os


if os.name == "nt":
    def pipe_non_blocking_set(fd):
        # Constant could define globally but avoid polluting the name-space
        # thanks to: /q/non-blocking-read-on-os-pipe-on-windows-1707/"""
    Iterate over lines, yielding b'' when nothings left
    or when new data is not yet available.

    stdout_iter = iter(non_blocking_readlines(process.stdout))

    line = next(stdout_iter)  # will be a line or b''.
    """
    import os

    from .pipe_non_blocking import (
            pipe_non_blocking_set,
            pipe_non_blocking_is_error_blocking,
            PortableBlockingIOError,
            )

    fd = f.fileno()
    pipe_non_blocking_set(fd)

    blocks = []

    while True:
        try:
            data = os.read(fd, chunk)
            if not data:
                # case were reading finishes with no trailing newline
                yield b''.join(blocks)
                blocks.clear()
        except PortableBlockingIOError as ex:
            if not pipe_non_blocking_is_error_blocking(ex):
                raise ex

            yield b''
            continue

        while True:
            n = data.find(b'\n')
            if n == -1:
                break

            yield b''.join(blocks) + data[:n + 1]
            data = data[n + 1:]
            blocks.clear()
        blocks.append(data)
2
répondu ideasman42 2017-05-23 11:54:58

le module sélectionnez vous aide à déterminer où se trouve la prochaine entrée utile.

cependant, vous êtes presque toujours plus heureux avec des fils séparés. L'un fait un blocage lire le stdin, l'autre fait où il est que vous ne voulez pas bloqué.

1
répondu S.Lott 2008-12-17 18:19:25

j'ai créé une bibliothèque basée sur la solution de J. F. Sebastian . Vous pouvez l'utiliser.

https://github.com/cenkalti/what

0
répondu Cenk Alti 2017-05-23 12:34:50

à partir de la réponse de J. F. Sebastian, et de plusieurs autres sources, j'ai créé un simple sous-directeur de processus. Il fournit la lecture de requête sans blocage, ainsi que l'exécution de plusieurs processus en parallèle. Il n'utilise aucun appel spécifique à OS (que je sache) et devrait donc fonctionner n'importe où.

C'est disponible chez pypi, donc juste pip install shelljob . Reportez-vous à la page du projet pour des exemples et des documents complets.

0
répondu edA-qa mort-ora-y 2013-10-31 10:09:25

EDIT: cette implémentation bloque toujours. Utilisez plutôt la réponse de J. F. Sebastian.

j'ai essayé le top answer , mais le risque supplémentaire et la maintenance du code thread était inquiétant.

en regardant à travers le IO module (et étant limité à 2.6), j'ai trouvé BufferedReader. C'est ma solution sans fil, non-bloquante.

import io
from subprocess import PIPE, Popen

p = Popen(['myprogram.exe'], stdout=PIPE)

SLEEP_DELAY = 0.001

# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
  while p.poll() == None:
      time.sleep(SLEEP_DELAY)
      while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
          line = buffer.readline()
          # do stuff with the line

  # Handle any remaining output after the process has ended
  while buffer.peek():
    line = buffer.readline()
    # do stuff with the line
0
répondu romc 2017-05-23 12:02:56

je suis tombé récemment sur le même problème J'ai besoin de lire une ligne à la fois de flux ( queue exécuter en sous-processus ) en mode sans blocage Je voulais éviter les problèmes suivants: ne pas brûler cpu, ne pas lire stream par un octet (comme readline l'a fait ), etc

Voici mon implémentation https://gist.github.com/grubberr/5501e1a9760c3eab5e0a il ne supporte pas windows (poll), ne gère pas EOF, mais ça me va bien

0
répondu grubberr 2015-01-11 12:33:31

pourquoi déranger thread&queue? contrairement à readline (), BufferedReader.read1 () wont block waiting for \r\n, il retourne ASAP s'il y a une sortie qui arrive.

#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io

def __main__():
    try:
        p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
    except: print("Popen failed"); quit()
    sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
    while True:
        buf = sout.read1(1024)
        if len(buf) == 0: break
        print buf,

if __name__ == '__main__':
    __main__()
0
répondu mfmain 2015-01-20 01:41:15

dans mon cas, j'avais besoin d'un module de journalisation qui capte la sortie des applications d'arrière-plan et l'augmente(ajout d'horodateurs, de couleurs, etc.).

j'ai fini avec un thread d'arrière-plan qui fait le code d'entrée/sortie réel est seulement pour les plates-formes POSIX. J'ai enlevé des parties non essentielles.

si quelqu'un va utiliser cette bête pour de longues courses envisager la gestion des descripteurs ouverts. Dans mon cas, c'était pas un gros problème.

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess

class Logger(threading.Thread):
    def __init__(self, *modules):
        threading.Thread.__init__(self)
        try:
            from select import epoll, EPOLLIN
            self.__poll = epoll()
            self.__evt = EPOLLIN
            self.__to = -1
        except:
            from select import poll, POLLIN
            print 'epoll is not available'
            self.__poll = poll()
            self.__evt = POLLIN
            self.__to = 100
        self.__fds = {}
        self.daemon = True
        self.start()

    def run(self):
        while True:
            events = self.__poll.poll(self.__to)
            for fd, ev in events:
                if (ev&self.__evt) != self.__evt:
                    continue
                try:
                    self.__fds[fd].run()
                except Exception, e:
                    print e

    def add(self, fd, log):
        assert not self.__fds.has_key(fd)
        self.__fds[fd] = log
        self.__poll.register(fd, self.__evt)

class log:
    logger = Logger()

    def __init__(self, name):
        self.__name = name
        self.__piped = False

    def fileno(self):
        if self.__piped:
            return self.write
        self.read, self.write = os.pipe()
        fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
        fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        self.fdRead = os.fdopen(self.read)
        self.logger.add(self.read, self)
        self.__piped = True
        return self.write

    def __run(self, line):
        self.chat(line, nl=False)

    def run(self):
        while True:
            try: line = self.fdRead.readline()
            except IOError, exc:
                if exc.errno == errno.EAGAIN:
                    return
                raise
            self.__run(line)

    def chat(self, line, nl=True):
        if nl: nl = '\n'
        else: nl = ''
        sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))

def system(command, param=[], cwd=None, env=None, input=None, output=None):
    args = [command] + param
    p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
    p.wait()

ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)

date = log('date')
date.chat('go')
system("date", output=date)
0
répondu Dmytro 2015-02-01 16:32:43

ceci est un exemple pour exécuter une commande interactive dans un sous-processus, et le stdout est interactif en utilisant un pseudo terminal. Vous pouvez vous référer à: https://stackoverflow.com/a/43012138/3555925

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen

command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()

# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())

# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()

# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
          preexec_fn=os.setsid,
          stdin=slave_fd,
          stdout=slave_fd,
          stderr=slave_fd,
          universal_newlines=True)

while p.poll() is None:
    r, w, e = select.select([sys.stdin, master_fd], [], [])
    if sys.stdin in r:
        d = os.read(sys.stdin.fileno(), 10240)
        os.write(master_fd, d)
    elif master_fd in r:
        o = os.read(master_fd, 10240)
        if o:
            os.write(sys.stdout.fileno(), o)

# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
0
répondu Liao 2017-05-23 12:02:56

mon problème est un peu différent car je voulais collecter à la fois stdout et stderr à partir d'un processus en cours d'exécution, mais finalement le même car je voulais rendre la sortie dans un widget comme son généré.

Je n'ai pas voulu recourir à beaucoup des solutions de contournement proposées en utilisant des files d'attente ou des Threads supplémentaires car elles ne devraient pas être nécessaires pour effectuer une tâche aussi commune que l'exécution d'un autre script et la collecte de sa sortie.

après lecture de la proposition solutions et python docs j'ai résolu mon problème avec l'implémentation ci-dessous. Oui, cela ne fonctionne que pour POSIX car j'utilise l'appel de fonction select .

je conviens que les docs sont confus et la mise en œuvre est maladroite pour une telle tâche de script commune. Je crois que les versions plus anciennes de python ont différentes valeurs par défaut pour Popen et différentes explications qui ont créé beaucoup de confusion. Cela semble bien fonctionner pour Python 2.7.12 et 3.5.2.

la clé était de définir bufsize=1 pour la mise en tampon de ligne et ensuite universal_newlines=True à traiter comme un fichier texte au lieu d'un binaire qui semble devenir la valeur par défaut lors de la définition bufsize=1 .

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

erreur, DEBUG et VERBOSE sont simplement des macros qui impriment la sortie vers le terminal.

cette solution est IMHO 99.99% efficace car il utilise toujours la fonction de blocage readline , donc nous supposons que le sous-processus est agréable et les résultats complets ligne.

je souhaite recevoir des commentaires pour améliorer la solution car je suis encore nouveau en Python.

0
répondu Brooke Wallace 2017-06-27 22:01:20

Cette solution utilise le module select pour" lire toutes les données disponibles " d'un flux IO. Cette fonction bloque d'abord jusqu'à ce que les données soient disponibles, mais lit ensuite seulement les données qui sont disponibles et ne bloque pas plus loin.

étant donné qu'il utilise le module select , cela ne fonctionne que sur Unix.

le code est entièrement conforme à PEP8.

import select


def read_available(input_stream, max_bytes=None):
    """
    Blocks until any data is available, then all available data is then read and returned.
    This function returns an empty string when end of stream is reached.

    Args:
        input_stream: The stream to read from.
        max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.

    Returns:
        str
    """
    # Prepare local variables
    input_streams = [input_stream]
    empty_list = []
    read_buffer = ""

    # Initially block for input using 'select'
    if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:

        # Poll read-readiness using 'select'
        def select_func():
            return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0

        # Create while function based on parameters
        if max_bytes is not None:
            def while_func():
                return (len(read_buffer) < max_bytes) and select_func()
        else:
            while_func = select_func

        while True:
            # Read single byte at a time
            read_data = input_stream.read(1)
            if len(read_data) == 0:
                # End of stream
                break
            # Append byte to string buffer
            read_buffer += read_data
            # Check if more data is available
            if not while_func():
                break

    # Return read buffer
    return read_buffer
0
répondu Bradley Odell 2017-10-02 20:55:47

j'ai également fait face au problème décrit par Jesse et l'ai résolu en utilisant" select "comme Bradley , Andy et d'autres ont fait, mais dans un mode de blocage pour éviter une boucle occupée. Il utilise un tuyau factice comme un faux stdin. Les blocs choisis et attendre que stdin ou le tuyau soit prêt. Quand une touche est pressée stdin débloque le select et la valeur de la touche peut être récupérée avec read(1). Quand un autre fil écrit au tuyau alors le tuyau débloque le select et il peut être considéré comme une indication que le besoin de stdin est terminé. Voici un code de référence:

import sys
import os
from select import select

# -------------------------------------------------------------------------    
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")

# -------------------------------------------------------------------------
def getKey():

    # Wait for stdin or pipe (fake stdin) to be ready
    dr,dw,de = select([sys.__stdin__, readFile], [], [])

    # If stdin is the one ready then read it and return value
    if sys.__stdin__ in dr:
        return sys.__stdin__.read(1)   # For Windows use ----> getch() from module msvcrt

    # Must finish
    else:
        return None

# -------------------------------------------------------------------------
def breakStdinRead():
    writeFile.write(' ')
    writeFile.flush()

# -------------------------------------------------------------------------
# MAIN CODE

# Get key stroke
key = getKey()

# Keyboard input
if key:
    # ... do your stuff with the key value

# Faked keystroke
else:
    # ... use of stdin finished

# -------------------------------------------------------------------------
# OTHER THREAD CODE

breakStdinRead()
0
répondu gonzaedu61 2018-04-07 08:09:33

j'ai le problème de l'auteur de la question, mais je n'ai pas voulu invoquer des fils. J'ai mélangé la solution de Jesse avec une lecture directe () à partir du tuyau, et mon propre buffer-handler pour la ligne lit (cependant, mon sous-processus - ping - a toujours écrit des lignes complètes < une taille de page système). J'évite d'être occupé-d'attendre en lisant uniquement dans une montre enregistrée à gobject. Ces jours-ci, j'exécute habituellement du code dans un MainLoop de gobject pour éviter les threads.

def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down

L'observateur est

def watch(f, *other):
print 'reading',f.read()
return True

et le programme principal met en place un ping puis appelle la boucle de messagerie gobject.

def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()

tout autre travail est relié aux callbacks dans gobject.

-1
répondu Dave Kitchen 2018-09-26 13:19:22

voici un module qui supporte les lectures non-bloquantes et les Écritures en arrière-plan en python:

https://pypi.python.org/pypi/python-nonblock

fournit une fonction,

nonblock_read qui lira les données du flux, si elles sont disponibles, sinon retournez une chaîne vide (ou None si le flux est fermé de l'autre côté et que toutes les données possibles ont été lues)

vous pouvez aussi considérons le module python-subprocess2,

https://pypi.python.org/pypi/python-subprocess2

qui s'ajoute au module de sous-processus. Donc sur l'objet retourné de " subprocess.Popen " est ajouté une méthode supplémentaire, runInBackground. Cela démarre un thread et renvoie un objet qui sera automatiquement peuplé comme stuff est écrit à stdout/stderr, sans bloquer votre thread principal.

Profitez-en!

-2
répondu Tim Savannah 2016-01-16 21:47:15