Comment dois-je me connecter en utilisant multiprocessing en Python?

en ce moment j'ai un module central dans un framework qui génère plusieurs processus en utilisant le module de python 2.6 multiprocessing module . Parce qu'il utilise multiprocessing , il y a un journal multiprocessing-aware au niveau du module, LOG = multiprocessing.get_logger() . Par le docs , ce logger a des verrous de processus partagés de sorte que vous ne gargouillez pas les choses dans sys.stderr (ou n'importe quel filehandle) en ayant plusieurs processus lui écrivant simultanément.

Le problème que j'ai maintenant est que les autres modules du cadre ne sont pas multiprocesseurs-conscients. De la façon dont je le vois, je dois faire en sorte que toutes les dépendances de ce module central utilisent la journalisation multiprocessing-aware. C'est ennuyeux dans le cadre, encore moins pour tous les clients du cadre. Existe-il des solutions de rechange je ne pense pas?

184
demandé sur cdleary 2009-03-13 07:02:31

19 réponses

La seule façon de traiter avec ce de manière non intrusive est:

  1. Spawn à chaque processus de travail de sorte que son journal va à un autre descripteur de fichier (à disque ou à la pipe. Idéalement, toutes les entrées de journal devraient être horodatées.
  2. votre processus de contrôleur peut alors faire un des suivants:
    • si vous utilisez des fichiers disques: Coalesce les fichiers journaux à la fin de la course, triées par date
    • Si l'aide de tuyaux (recommandé): Fusionner les entrées de journal à la volée à partir de tous les tuyaux, dans un fichier journal. (Par exemple, Périodiquement select à partir des descripteurs de fichiers des pipes, effectuer le tri par fusion sur les entrées de journal disponibles, et la chasse à Journal centralisé. Répéter.)
54
répondu vladr 2015-04-20 13:28:38

je viens d'écrire mon propre gestionnaire de rondins qui alimente tout le processus parent via un tuyau. Ça ne fait que dix minutes que je le teste, mais ça a l'air de bien marcher.

( Note: C'est codé en dur RotatingFileHandler , ce qui est mon propre cas d'utilisation.)


mise à Jour: @javier maintenant maintient cette approche comme un paquet disponible sur Pypi - voir multitraitement d'enregistrement sur Pypi, github à https://github.com/jruere/multiprocessing-logging


Mise À Jour: La Mise En Œuvre!

cela utilise maintenant une file d'attente pour le traitement correct de la concurrence, et récupère également des erreurs correctement. Je l'utilise maintenant en production depuis plusieurs mois, et la version actuelle ci-dessous fonctionne sans problème.

from logging.handlers import RotatingFileHandler
import multiprocessing, threading, logging, sys, traceback

class MultiProcessingLog(logging.Handler):
    def __init__(self, name, mode, maxsize, rotate):
        logging.Handler.__init__(self)

        self._handler = RotatingFileHandler(name, mode, maxsize, rotate)
        self.queue = multiprocessing.Queue(-1)

        t = threading.Thread(target=self.receive)
        t.daemon = True
        t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        while True:
            try:
                record = self.queue.get()
                self._handler.emit(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

    def send(self, s):
        self.queue.put_nowait(s)

    def _format_record(self, record):
        # ensure that exc_info and args
        # have been stringified.  Removes any chance of
        # unpickleable things inside and possibly reduces
        # message size sent over the pipe
        if record.args:
            record.msg = record.msg % record.args
            record.args = None
        if record.exc_info:
            dummy = self.format(record)
            record.exc_info = None

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        self._handler.close()
        logging.Handler.close(self)
94
répondu zzzeek 2018-01-24 15:06:26

encore une autre alternative pourrait être les divers gestionnaires de journalisation non basés sur des fichiers dans le logging paquet :

  • SocketHandler
  • DatagramHandler
  • SyslogHandler

(et autres)

de cette façon, vous pourriez facilement avoir un démon de journalisation quelque part que vous pourriez écrire en toute sécurité et traiteriez les résultats correctement. (Par exemple, un simple serveur socket qui déballe le message et l'envoie à son propre gestionnaire de fichiers rotatif.)

le SyslogHandler s'occuperait de ça pour vous aussi. Bien sûr, vous pouvez utiliser votre propre instance de syslog , pas le système.

18
répondu Ali Afshar 2015-04-20 13:29:55

une variante des autres qui maintient le fil de journalisation et de file d'attente séparé.

"""sample code for logging in subprocesses using multiprocessing

* Little handler magic - The main process uses loggers and handlers as normal.
* Only a simple handler is needed in the subprocess that feeds the queue.
* Original logger name from subprocess is preserved when logged in main
  process.
* As in the other implementations, a thread reads the queue and calls the
  handlers. Except in this implementation, the thread is defined outside of a
  handler, which makes the logger definitions simpler.
* Works with multiple handlers.  If the logger in the main process defines
  multiple handlers, they will all be fed records generated by the
  subprocesses loggers.

tested with Python 2.5 and 2.6 on Linux and Windows

"""

import os
import sys
import time
import traceback
import multiprocessing, threading, logging, sys

DEFAULT_LEVEL = logging.DEBUG

formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s")

class SubProcessLogHandler(logging.Handler):
    """handler used by subprocesses

    It simply puts items on a Queue for the main process to log.

    """

    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue

    def emit(self, record):
        self.queue.put(record)

class LogQueueReader(threading.Thread):
    """thread to write subprocesses log records to main process log

    This thread reads the records written by subprocesses and writes them to
    the handlers defined in the main process's handlers.

    """

    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.daemon = True

    def run(self):
        """read from the queue and write to the log handlers

        The logging documentation says logging is thread safe, so there
        shouldn't be contention between normal logging (from the main
        process) and this thread.

        Note that we're using the name of the original logger.

        """
        # Thanks Mike for the error checking code.
        while True:
            try:
                record = self.queue.get()
                # get the logger for this record
                logger = logging.getLogger(record.name)
                logger.callHandlers(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

class LoggingProcess(multiprocessing.Process):

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def _setupLogger(self):
        # create the logger to use.
        logger = logging.getLogger('test.subprocess')
        # The only handler desired is the SubProcessLogHandler.  If any others
        # exist, remove them. In this case, on Unix and Linux the StreamHandler
        # will be inherited.

        for handler in logger.handlers:
            # just a check for my sanity
            assert not isinstance(handler, SubProcessLogHandler)
            logger.removeHandler(handler)
        # add the handler
        handler = SubProcessLogHandler(self.queue)
        handler.setFormatter(formatter)
        logger.addHandler(handler)

        # On Windows, the level will not be inherited.  Also, we could just
        # set the level to log everything here and filter it in the main
        # process handlers.  For now, just set it from the global default.
        logger.setLevel(DEFAULT_LEVEL)
        self.logger = logger

    def run(self):
        self._setupLogger()
        logger = self.logger
        # and here goes the logging
        p = multiprocessing.current_process()
        logger.info('hello from process %s with pid %s' % (p.name, p.pid))


if __name__ == '__main__':
    # queue used by the subprocess loggers
    queue = multiprocessing.Queue()
    # Just a normal logger
    logger = logging.getLogger('test')
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.setLevel(DEFAULT_LEVEL)
    logger.info('hello from the main process')
    # This thread will read from the subprocesses and write to the main log's
    # handlers.
    log_queue_reader = LogQueueReader(queue)
    log_queue_reader.start()
    # create the processes.
    for i in range(10):
        p = LoggingProcess(queue)
        p.start()
    # The way I read the multiprocessing warning about Queue, joining a
    # process before it has finished feeding the Queue can cause a deadlock.
    # Also, Queue.empty() is not realiable, so just make sure all processes
    # are finished.
    # active_children joins subprocesses when they're finished.
    while multiprocessing.active_children():
        time.sleep(.1)
13
répondu ironhacker 2010-07-15 07:41:56

le python logging cookbook a deux exemples complets ici: https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes

il utilise QueueHandler , qui est nouveau en python 3.2 mais facile à copier dans votre propre code (comme je l'ai fait moi-même en python 2.7) à partir de: https://gist.github.com/vsajip/591589

chaque processus met son logging sur le Queue , et puis un fil ou un processus listener (un exemple est fourni pour chacun) ramasse ceux - ci et les écrit tous dans un fichier-aucun risque de corruption ou de brouillage.

12
répondu fantabolous 2015-08-18 06:47:17

toutes les solutions actuelles sont trop couplées à la configuration de journalisation en utilisant un handler. Ma solution a l'architecture et les caractéristiques suivantes:

  • vous pouvez utiliser n'importe quelle configuration de journalisation que vous voulez
  • enregistrement se fait dans un fil de démon
  • arrêt Sûr du démon à l'aide d'un gestionnaire de contexte
  • la Communication avec le fil de journalisation se fait par multiprocessing.Queue
  • dans les sous-processus, logging.Logger (et les instances déjà définies) sont patchés pour envoyer tous enregistrements à la file d'attente
  • Nouveau : format traceback et message avant l'envoi à la file d'attente pour prévenir les erreurs de décapage

le Code avec l'exemple d'utilisation et la sortie peut être trouvé à la Gist suivante: https://gist.github.com/schlamar/7003737

10
répondu schlamar 2013-10-16 08:04:10

ci-dessous est une autre solution avec un accent sur la simplicité pour tous les autres (comme moi) qui arrivent ici de Google. La journalisation devrait être facile! Uniquement pour les 3.2 ou supérieur.

import multiprocessing
import logging
from logging.handlers import QueueHandler, QueueListener
import time
import random


def f(i):
    time.sleep(random.uniform(.01, .05))
    logging.info('function called with {} in worker thread.'.format(i))
    time.sleep(random.uniform(.01, .05))
    return i


def worker_init(q):
    # all records from worker processes go to qh and then into q
    qh = QueueHandler(q)
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    logger.addHandler(qh)


def logger_init():
    q = multiprocessing.Queue()
    # this is the handler for all log records
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s"))

    # ql gets records from the queue and sends them to the handler
    ql = QueueListener(q, handler)
    ql.start()

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    # add the handler to the logger so records from this process are handled
    logger.addHandler(handler)

    return ql, q


def main():
    q_listener, q = logger_init()

    logging.info('hello from main thread')
    pool = multiprocessing.Pool(4, worker_init, [q])
    for result in pool.map(f, range(10)):
        pass
    pool.close()
    pool.join()
    q_listener.stop()

if __name__ == '__main__':
    main()
10
répondu user2133814 2016-01-23 20:13:33

j'aime aussi la réponse de zzzeek mais Andre a raison qu'une file d'attente est nécessaire pour empêcher les grincements. J'ai eu un peu de chance avec le tuyau, mais j'ai vu des grincements qui sont quelque peu attendus. L'implémenter s'est avéré être plus difficile que je ne le pensais, notamment en raison de l'exécution sur Windows, où il y a quelques restrictions supplémentaires sur les variables globales et autres choses (voir: comment Python Multiprocessing est-il implémenté sur Windows? )

mais, je l'ai finalement eu travailler. Cet exemple n'est probablement pas parfait, donc les commentaires et suggestions sont les bienvenus. Il ne supporte pas non plus de définir le formatteur ou autre chose que le logger racine. Fondamentalement, vous devez réinitialiser le logger dans chacun des processus de pool avec la file d'attente et configurer les autres attributs sur le logger.

encore une fois, toute suggestion sur la façon d'améliorer le code est la bienvenue. Certes, je ne connais pas toutes les Python trucs encore :-)

import multiprocessing, logging, sys, re, os, StringIO, threading, time, Queue

class MultiProcessingLogHandler(logging.Handler):
    def __init__(self, handler, queue, child=False):
        logging.Handler.__init__(self)

        self._handler = handler
        self.queue = queue

        # we only want one of the loggers to be pulling from the queue.
        # If there is a way to do this without needing to be passed this
        # information, that would be great!
        if child == False:
            self.shutdown = False
            self.polltime = 1
            t = threading.Thread(target=self.receive)
            t.daemon = True
            t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        #print "receive on"
        while (self.shutdown == False) or (self.queue.empty() == False):
            # so we block for a short period of time so that we can
            # check for the shutdown cases.
            try:
                record = self.queue.get(True, self.polltime)
                self._handler.emit(record)
            except Queue.Empty, e:
                pass

    def send(self, s):
        # send just puts it in the queue for the server to retrieve
        self.queue.put(s)

    def _format_record(self, record):
        ei = record.exc_info
        if ei:
            dummy = self.format(record) # just to get traceback text into record.exc_text
            record.exc_info = None  # to avoid Unpickleable error

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        time.sleep(self.polltime+1) # give some time for messages to enter the queue.
        self.shutdown = True
        time.sleep(self.polltime+1) # give some time for the server to time out and see the shutdown

    def __del__(self):
        self.close() # hopefully this aids in orderly shutdown when things are going poorly.

def f(x):
    # just a logging command...
    logging.critical('function number: ' + str(x))
    # to make some calls take longer than others, so the output is "jumbled" as real MP programs are.
    time.sleep(x % 3)

def initPool(queue, level):
    """
    This causes the logging module to be initialized with the necessary info
    in pool threads to work correctly.
    """
    logging.getLogger('').addHandler(MultiProcessingLogHandler(logging.StreamHandler(), queue, child=True))
    logging.getLogger('').setLevel(level)

if __name__ == '__main__':
    stream = StringIO.StringIO()
    logQueue = multiprocessing.Queue(100)
    handler= MultiProcessingLogHandler(logging.StreamHandler(stream), logQueue)
    logging.getLogger('').addHandler(handler)
    logging.getLogger('').setLevel(logging.DEBUG)

    logging.debug('starting main')

    # when bulding the pool on a Windows machine we also have to init the logger in all the instances with the queue and the level of logging.
    pool = multiprocessing.Pool(processes=10, initializer=initPool, initargs=[logQueue, logging.getLogger('').getEffectiveLevel()] ) # start worker processes
    pool.map(f, range(0,50))
    pool.close()

    logging.debug('done')
    logging.shutdown()
    print "stream output is:"
    print stream.getvalue()
6
répondu Mike Miller 2017-05-23 11:54:36

puisque nous pouvons représenter la journalisation multiprocess comme beaucoup d'éditeurs et un abonné (auditeur), en utilisant ZeroMQ pour implémenter la messagerie PUB-SUB est en effet une option.

de plus, PyZMQ module, les fixations Python pour ZMQ, implémente PUBHandler , qui est un objet pour publier des messages de journalisation sur un zmq.PUB socket.

il y a une solution sur le web , pour la journalisation centralisée à partir de l'application distribuée en utilisant PyZMQ et PUBHandler, qui peut être facilement adopté pour travailler localement avec plusieurs processus de publication.

formatters = {
    logging.DEBUG: logging.Formatter("[%(name)s] %(message)s"),
    logging.INFO: logging.Formatter("[%(name)s] %(message)s"),
    logging.WARN: logging.Formatter("[%(name)s] %(message)s"),
    logging.ERROR: logging.Formatter("[%(name)s] %(message)s"),
    logging.CRITICAL: logging.Formatter("[%(name)s] %(message)s")
}

# This one will be used by publishing processes
class PUBLogger:
    def __init__(self, host, port=config.PUBSUB_LOGGER_PORT):
        self._logger = logging.getLogger(__name__)
        self._logger.setLevel(logging.DEBUG)
        self.ctx = zmq.Context()
        self.pub = self.ctx.socket(zmq.PUB)
        self.pub.connect('tcp://{0}:{1}'.format(socket.gethostbyname(host), port))
        self._handler = PUBHandler(self.pub)
        self._handler.formatters = formatters
        self._logger.addHandler(self._handler)

    @property
    def logger(self):
        return self._logger

# This one will be used by listener process
class SUBLogger:
    def __init__(self, ip, output_dir="", port=config.PUBSUB_LOGGER_PORT):
        self.output_dir = output_dir
        self._logger = logging.getLogger()
        self._logger.setLevel(logging.DEBUG)

        self.ctx = zmq.Context()
        self._sub = self.ctx.socket(zmq.SUB)
        self._sub.bind('tcp://*:{1}'.format(ip, port))
        self._sub.setsockopt(zmq.SUBSCRIBE, "")

        handler = handlers.RotatingFileHandler(os.path.join(output_dir,                 "client_debug.log"), "w", 100 * 1024 * 1024, 10)
        handler.setLevel(logging.DEBUG)
        formatter = logging.Formatter("%(asctime)s;%(levelname)s - %(message)s")
        handler.setFormatter(formatter)
        self._logger.addHandler(handler)

  @property
  def sub(self):
      return self._sub

  @property
  def logger(self):
      return self._logger

#  And that's the way we actually run things:

# Listener process will forever listen on SUB socket for incoming messages
def run_sub_logger(ip, event):
    sub_logger = SUBLogger(ip)
    while not event.is_set():
        try:
            topic, message = sub_logger.sub.recv_multipart(flags=zmq.NOBLOCK)
            log_msg = getattr(logging, topic.lower())
            log_msg(message)
        except zmq.ZMQError as zmq_error:
            if zmq_error.errno == zmq.EAGAIN:
                pass


# Publisher processes loggers should be initialized as follows:

class Publisher:
    def __init__(self, stop_event, proc_id):
        self.stop_event = stop_event
        self.proc_id = proc_id
        self._logger = pub_logger.PUBLogger('127.0.0.1').logger

     def run(self):
         self._logger.info("{0} - Sending message".format(proc_id))

def run_worker(event, proc_id):
    worker = Publisher(event, proc_id)
    worker.run()

# Starting subscriber process so we won't loose publisher's messages
sub_logger_process = Process(target=run_sub_logger,
                                 args=('127.0.0.1'), stop_event,))
sub_logger_process.start()

#Starting publisher processes
for i in range(MAX_WORKERS_PER_CLIENT):
    processes.append(Process(target=run_worker,
                                 args=(stop_event, i,)))
for p in processes:
    p.start()
6
répondu Samuel 2018-04-08 08:27:22

juste de publier quelque part votre instance de l'enregistreur. de cette façon, les autres modules et clients peuvent utiliser votre API pour obtenir l'Enregistreur sans avoir à import multiprocessing .

3
répondu Javier 2009-03-13 04:40:00

j'ai aimé la réponse de zzzeek. Je substituerais simplement le tuyau à une file d'attente car si plusieurs threads/processus utilisent la même extrémité de tuyau pour générer des messages de journalisation, ils seront Brouillés.

3
répondu André Cruz 2009-06-06 13:59:52

Que Diriez-vous de déléguer toute la journalisation à un autre processus qui lit toutes les entrées de journaux à partir d'une file d'attente?

LOG_QUEUE = multiprocessing.JoinableQueue()

class CentralLogger(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue
        self.log = logger.getLogger('some_config')
        self.log.info("Started Central Logging process")

    def run(self):
        while True:
            log_level, message = self.queue.get()
            if log_level is None:
                self.log.info("Shutting down Central Logging process")
                break
            else:
                self.log.log(log_level, message)

central_logger_process = CentralLogger(LOG_QUEUE)
central_logger_process.start()

partagez simplement LOG_QUEUE via n'importe lequel des mécanismes multiprocessus ou même l'héritage et tout va bien!

2
répondu Sawan 2014-03-12 23:13:44

j'ai une solution similaire à ironhacker sauf que j'utilise la journalisation.exception dans certains de mes codes et j'ai trouvé que j'avais besoin de formater l'exception avant de la renvoyer au-dessus de la file d'attente puisque les tracebacks ne sont pas pickle'able:

class QueueHandler(logging.Handler):
    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue
    def emit(self, record):
        if record.exc_info:
            # can't pass exc_info across processes so just format now
            record.exc_text = self.formatException(record.exc_info)
            record.exc_info = None
        self.queue.put(record)
    def formatException(self, ei):
        sio = cStringIO.StringIO()
        traceback.print_exception(ei[0], ei[1], ei[2], None, sio)
        s = sio.getvalue()
        sio.close()
        if s[-1] == "\n":
            s = s[:-1]
        return s
1
répondu Richard Jones 2010-08-05 06:11:37

ci-dessous est une classe qui peut être utilisée dans L'environnement Windows, nécessite ActivePython. Vous pouvez également hériter pour d'autres exploitants forestiers (StreamHandler etc.)

class SyncronizedFileHandler(logging.FileHandler):
    MUTEX_NAME = 'logging_mutex'

    def __init__(self , *args , **kwargs):

        self.mutex = win32event.CreateMutex(None , False , self.MUTEX_NAME)
        return super(SyncronizedFileHandler , self ).__init__(*args , **kwargs)

    def emit(self, *args , **kwargs):
        try:
            win32event.WaitForSingleObject(self.mutex , win32event.INFINITE)
            ret = super(SyncronizedFileHandler , self ).emit(*args , **kwargs)
        finally:
            win32event.ReleaseMutex(self.mutex)
        return ret

Et voici un exemple qui démontre l'usage:

import logging
import random , time , os , sys , datetime
from string import letters
import win32api , win32event
from multiprocessing import Pool

def f(i):
    time.sleep(random.randint(0,10) * 0.1)
    ch = random.choice(letters)
    logging.info( ch * 30)


def init_logging():
    '''
    initilize the loggers
    '''
    formatter = logging.Formatter("%(levelname)s - %(process)d - %(asctime)s - %(filename)s - %(lineno)d - %(message)s")
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    file_handler = SyncronizedFileHandler(sys.argv[1])
    file_handler.setLevel(logging.INFO)
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)

#must be called in the parent and in every worker process
init_logging() 

if __name__ == '__main__':
    #multiprocessing stuff
    pool = Pool(processes=10)
    imap_result = pool.imap(f , range(30))
    for i , _ in enumerate(imap_result):
        pass
1
répondu user6336812 2016-06-01 06:57:48

voici ma solution de contournement... pas la plus complète, mais facilement modifiable et plus simple à lire et à comprendre je pense que toutes les autres réponses que j'ai trouvées avant d'écrire ceci:

import logging
import multiprocessing

class FakeLogger(object):
    def __init__(self, q):
        self.q = q
    def info(self, item):
        self.q.put('INFO - {}'.format(item))
    def debug(self, item):
        self.q.put('DEBUG - {}'.format(item))
    def critical(self, item):
        self.q.put('CRITICAL - {}'.format(item))
    def warning(self, item):
        self.q.put('WARNING - {}'.format(item))

def some_other_func_that_gets_logger_and_logs(num):
    # notice the name get's discarded
    # of course you can easily add this to your FakeLogger class
    local_logger = logging.getLogger('local')
    local_logger.info('Hey I am logging this: {} and working on it to make this {}!'.format(num, num*2))
    local_logger.debug('hmm, something may need debugging here')
    return num*2

def func_to_parallelize(data_chunk):
    # unpack our args
    the_num, logger_q = data_chunk
    # since we're now in a new process, let's monkeypatch the logging module
    logging.getLogger = lambda name=None: FakeLogger(logger_q)
    # now do the actual work that happens to log stuff too
    new_num = some_other_func_that_gets_logger_and_logs(the_num)
    return (the_num, new_num)

if __name__ == '__main__':
    multiprocessing.freeze_support()
    m = multiprocessing.Manager()
    logger_q = m.Queue()
    # we have to pass our data to be parallel-processed
    # we also need to pass the Queue object so we can retrieve the logs
    parallelable_data = [(1, logger_q), (2, logger_q)]
    # set up a pool of processes so we can take advantage of multiple CPU cores
    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=4)
    worker_output = pool.map(func_to_parallelize, parallelable_data)
    pool.close() # no more tasks
    pool.join()  # wrap up current tasks
    # get the contents of our FakeLogger object
    while not logger_q.empty():
        print logger_q.get()
    print 'worker output contained: {}'.format(worker_output)
1
répondu nmz787 2016-09-13 16:55:19

l'une des alternatives est d'écrire la journalisation mutliprocessing à un fichier connu et d'enregistrer un handler atexit à joindre sur ces processus relisez-le sur stderr; cependant, vous n'obtiendrez pas un flux en temps réel aux messages de sortie sur stderr de cette façon.

0
répondu cdleary 2009-03-13 04:40:17

si vous avez des blocages se produisant dans une combinaison de serrures, de fils et de fourches dans le module logging , cela est rapporté dans rapport de bug 6721 (voir aussi related SO question ).

il y a une petite solution d'installation affichée ici .

cependant, cela ne fera que corriger les blocages potentiels dans logging . Qui ne va pas s'arranger, que les choses sont peut-être déformé. Voir la d'autres réponses présentées ici.

0
répondu Albert 2017-05-23 10:31:33

il y a ce grand paquet

Colis

: https://pypi.python.org/pypi/multiprocessing-logging /

Code

: https://github.com/jruere/multiprocessing-logging

Installation:

pip install multiprocessing-logging

puis Ajouter:

import multiprocessing_logging

# This enables logs inside process
multiprocessing_logging.install_mp_handler()
0
répondu juan Isaza 2018-04-07 02:55:58

À mes enfants qui rencontrent le même problème depuis des décennies, et trouve cette question sur ce site, je laisse cette réponse.

Simplicité vs de compliquer à l'excès. Juste utiliser d'autres outils. Python est génial, mais il n'a pas été conçu pour faire certaines choses.

l'extrait suivant pour logrotate le démon travaille pour moi et ne complique pas les choses. Programmez-le pour fonctionner toutes les heures et

/var/log/mylogfile.log {
    size 1
    copytruncate
    create
    rotate 10
    missingok
    postrotate
        timeext=`date -d '1 hour ago' "+%Y-%m-%d_%H"`
        mv /var/log/mylogfile.log.1 /var/log/mylogfile-$timeext.log
    endscript
}

C'est comme ça que je installez - le (symlinks ne fonctionnent pas pour logrotate):

sudo cp /directpath/config/logrotate/myconfigname /etc/logrotate.d/myconfigname
sudo cp /etc/cron.daily/logrotate /etc/cron.hourly/logrotate
-1
répondu baldr 2018-02-07 16:32:32