Comprendre le Multiprocessing: gestion de la mémoire partagée, Serrures et Files d'attente en Python

Multitraitement est un outil puissant en python, et je veux le comprendre plus en profondeur. Je veux savoir quand utiliser Serrures et Files d'attente et quand utiliser un traitement multiple Gestionnaire les partager entre tous les processus.

j'ai proposé les scénarios de test suivants avec quatre conditions différentes pour le multiprocessing:

  1. utilisation d'une piscine et NO Gestionnaire

  2. utilisation d'un pool et D'un Manager

  3. utilisant des processus individuels et NO Gestionnaire

  4. utilisation de processus individuels et D'un gestionnaire

Le Travail

toutes les conditions exécutent une fonction de travail the_job. the_job consiste en une impression qui est sécurisée par une serrure. En outre, l'entrée de la fonction est simplement mis dans un file d'attente (pour voir si elle peut être récupérée à partir de la file d'attente). Cette entrée est tout simplement un index idxrange(10) créé dans le script principal, appelé start_scenario (en bas).

def the_job(args):
    """The job for multiprocessing.

    Prints some stuff secured by a lock and 
    finally puts the input into a queue.

    """
    idx = args[0]
    lock = args[1]
    queue=args[2]

    lock.acquire()
    print 'I'
    print 'was '
    print 'here '
    print '!!!!'
    print '1111'
    print 'einhundertelfzigelfn'
    who= ' By run %d n' % idx
    print who
    lock.release()

    queue.put(idx)

Le succès d'une condition est définie comme parfaitement rappelant l'entrée de la file d'attente, Voir la fonction read_queue au fond.

Les Conditions

les conditions 1 et 2 sont assez explicites. La Condition 1 implique la création d'une serrure et d'une file d'attente, et le passage ces à un processus de piscine:

def scenario_1_pool_no_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITHOUT a Manager for the lock and queue.

    FAILS!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    mypool.imap(jobfunc, iterator)

    mypool.close()
    mypool.join()

    return read_queue(queue)

(La fonction d'assistance make_iterator est donnée au bas de ce post.) Conditions 1 échoue avec RuntimeError: Lock objects should only be shared between processes through inheritance.

la Condition 2 est assez similaire, mais maintenant la serrure et la file d'attente sont sous la supervision d'un manager:

def scenario_2_pool_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITH a Manager for the lock and queue.

    SUCCESSFUL!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)
    mypool.imap(jobfunc, iterator)
    mypool.close()
    mypool.join()

    return read_queue(queue)

en condition 3, de nouveaux processus sont lancés manuellement, et la serrure et la file d'attente sont créées sans gestionnaire:

def scenario_3_single_processes_no_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITHOUT a Manager,

    SUCCESSFUL!

    """
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)

la Condition 4 est similaire mais utilise maintenant une gestionnaire:

def scenario_4_single_processes_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITH a Manager,

    SUCCESSFUL!

    """
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)

dans les deux conditions-3 et 4-je commence un nouveau pour chacune des 10 tâches de the_job avec au plus nercores processus d'exploitation au même moment. Ceci est réalisé avec la fonction d'aide suivante:

def do_job_single_processes(jobfunc, iterator, ncores):
    """Runs a job function by starting individual processes for every task.

    At most `ncores` processes operate at the same time

    :param jobfunc: Job to do

    :param iterator:

        Iterator over different parameter settings,
        contains a lock and a queue

    :param ncores:

        Number of processes operating at the same time

    """
    keep_running=True
    process_dict = {} # Dict containing all subprocees

    while len(process_dict)>0 or keep_running:

        terminated_procs_pids = []
        # First check if some processes did finish their job
        for pid, proc in process_dict.iteritems():

            # Remember the terminated processes
            if not proc.is_alive():
                terminated_procs_pids.append(pid)

        # And delete these from the process dict
        for terminated_proc in terminated_procs_pids:
            process_dict.pop(terminated_proc)

        # If we have less active processes than ncores and there is still
        # a job to do, add another process
        if len(process_dict) < ncores and keep_running:
            try:
                task = iterator.next()
                proc = mp.Process(target=jobfunc,
                                                   args=(task,))
                proc.start()
                process_dict[proc.pid]=proc
            except StopIteration:
                # All tasks have been started
                keep_running=False

        time.sleep(0.1)

Le Résultat

seule la condition 1 échoue (RuntimeError: Lock objects should only be shared between processes through inheritance) alors que les 3 autres conditions sont réussies. J'ai essayer d'envelopper ma tête autour de ce résultat.

Pourquoi la piscine besoin de partager un verrouillage et une file d'attente entre tous les processus mais pas les processus individuels de la condition 3?

ce que je sais c'est que pour les conditions de pool (1 et 2) Toutes les données des itérateurs sont passées par pickling, alors que dans les conditions de processus simples (3 et 4) toutes les données des itérateurs sont passées par héritage du processus principal (j'utilise Linux). Je suppose que jusqu'à ce que la mémoire soit changée de l'intérieur d'un processus enfant, le même souvenir que le processus parental utilise est accessible (copy-on-write). Mais dès que l'on dit lock.acquire(), cela devrait être changé et les processus enfants utilisent des serrures différentes placées ailleurs dans la mémoire, n'est-ce pas? Comment un enfant de savoir qu'un frère a activé un verrou qui n'est pas partagé par l'intermédiaire d'un gestionnaire?

enfin, un peu apparenté est ma question à quel point les conditions 3 et 4 sont différentes. Les deux ayant chacun des processus, mais ils diffèrent dans l'utilisation d'un gestionnaire. Sont considérés comme des valide code? Ou doit-on éviter d'utiliser un gestionnaire si il n'ya vraiment pas besoin d'un?


Texte Complet

pour ceux qui veulent simplement copier et coller tout pour exécuter le code, Voici le script complet:

__author__ = 'Me and myself'

import multiprocessing as mp
import time

def the_job(args):
    """The job for multiprocessing.

    Prints some stuff secured by a lock and 
    finally puts the input into a queue.

    """
    idx = args[0]
    lock = args[1]
    queue=args[2]

    lock.acquire()
    print 'I'
    print 'was '
    print 'here '
    print '!!!!'
    print '1111'
    print 'einhundertelfzigelfn'
    who= ' By run %d n' % idx
    print who
    lock.release()

    queue.put(idx)


def read_queue(queue):
    """Turns a qeue into a normal python list."""
    results = []
    while not queue.empty():
        result = queue.get()
        results.append(result)
    return results


def make_iterator(args, lock, queue):
    """Makes an iterator over args and passes the lock an queue to each element."""
    return ((arg, lock, queue) for arg in args)


def start_scenario(scenario_number = 1):
    """Starts one of four multiprocessing scenarios.

    :param scenario_number: Index of scenario, 1 to 4

    """
    args = range(10)
    ncores = 3
    if scenario_number==1:
        result =  scenario_1_pool_no_manager(the_job, args, ncores)

    elif scenario_number==2:
        result =  scenario_2_pool_manager(the_job, args, ncores)

    elif scenario_number==3:
        result =  scenario_3_single_processes_no_manager(the_job, args, ncores)

    elif scenario_number==4:
        result =  scenario_4_single_processes_manager(the_job, args, ncores)

    if result != args:
        print 'Scenario %d fails: %s != %s' % (scenario_number, args, result)
    else:
        print 'Scenario %d successful!' % scenario_number


def scenario_1_pool_no_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITHOUT a Manager for the lock and queue.

    FAILS!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    mypool.map(jobfunc, iterator)

    mypool.close()
    mypool.join()

    return read_queue(queue)


def scenario_2_pool_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITH a Manager for the lock and queue.

    SUCCESSFUL!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)
    mypool.map(jobfunc, iterator)
    mypool.close()
    mypool.join()

    return read_queue(queue)


def scenario_3_single_processes_no_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITHOUT a Manager,

    SUCCESSFUL!

    """
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)


def scenario_4_single_processes_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITH a Manager,

    SUCCESSFUL!

    """
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)


def do_job_single_processes(jobfunc, iterator, ncores):
    """Runs a job function by starting individual processes for every task.

    At most `ncores` processes operate at the same time

    :param jobfunc: Job to do

    :param iterator:

        Iterator over different parameter settings,
        contains a lock and a queue

    :param ncores:

        Number of processes operating at the same time

    """
    keep_running=True
    process_dict = {} # Dict containing all subprocees

    while len(process_dict)>0 or keep_running:

        terminated_procs_pids = []
        # First check if some processes did finish their job
        for pid, proc in process_dict.iteritems():

            # Remember the terminated processes
            if not proc.is_alive():
                terminated_procs_pids.append(pid)

        # And delete these from the process dict
        for terminated_proc in terminated_procs_pids:
            process_dict.pop(terminated_proc)

        # If we have less active processes than ncores and there is still
        # a job to do, add another process
        if len(process_dict) < ncores and keep_running:
            try:
                task = iterator.next()
                proc = mp.Process(target=jobfunc,
                                                   args=(task,))
                proc.start()
                process_dict[proc.pid]=proc
            except StopIteration:
                # All tasks have been started
                keep_running=False

        time.sleep(0.1)


def main():
    """Runs 1 out of 4 different multiprocessing scenarios"""
    start_scenario(1)


if __name__ == '__main__':
    main()
36
demandé sur dano 2013-12-23 15:30:34

1 réponses

multiprocessing.Lock est implémenté en utilisant un objet sémaphore fourni par L'OS. Sous Linux, L'enfant hérite d'une poignée au sémaphore du parent via os.fork. Ce n'est pas une copie du Sémaphore; c'est en fait hériter le même handle que le parent a, de la même façon que les descripteurs de fichiers peuvent être hérités. Windows, par contre, ne supporte pas os.fork, de sorte qu'il doit décaper le Lock. Il le fait en créant une poignée dupliquée sur Le Sémaphore de Windows utilisé en interne par le multiprocessing.Lock l'objet, à l'aide de Windows DuplicateHandle API, qui stipule:

la poignée dupliquée fait référence au même objet que la poignée originale. Par conséquent, tout changement apporté à l'objet est reflété à la fois poignées

DuplicateHandle API vous permet de donner la propriété de la poignée dupliquée au processus enfant, de sorte que le processus enfant peut réellement l'utiliser après l'avoir déballé. En créant une poignée dupliquée appartenant à l'enfant, vous peut effectivement "partager" le verrou de l'objet.

Voici l'objet sémaphore dans multiprocessing/synchronize.py

class SemLock(object):

    def __init__(self, kind, value, maxvalue):
        sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
        debug('created semlock with handle %s' % sl.handle)
        self._make_methods()

        if sys.platform != 'win32':
            def _after_fork(obj):
                obj._semlock._after_fork()
            register_after_fork(self, _after_fork)

    def _make_methods(self):
        self.acquire = self._semlock.acquire
        self.release = self._semlock.release
        self.__enter__ = self._semlock.__enter__
        self.__exit__ = self._semlock.__exit__

    def __getstate__(self):  # This is called when you try to pickle the `Lock`.
        assert_spawning(self)
        sl = self._semlock
        return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)

    def __setstate__(self, state): # This is called when unpickling a `Lock`
        self._semlock = _multiprocessing.SemLock._rebuild(*state)
        debug('recreated blocker with handle %r' % state[0])
        self._make_methods()

Notez le assert_spawning appel __getstate__, qui est appelé lors du décapage de l'objet. Voici comment cela est mis en place:

#
# Check that the current thread is spawning a child process
#

def assert_spawning(self):
    if not Popen.thread_is_spawning():
        raise RuntimeError(
            '%s objects should only be shared between processes'
            ' through inheritance' % type(self).__name__
            )

cette fonction est celle qui s'assure que vous "héritez" du Lock, en appelant thread_is_spawning. Sur Linux, cette méthode retourne juste False:

@staticmethod
def thread_is_spawning():
    return False

C'est parce que Linux n'a pas besoin de hériter Lock, si __getstate__ est en fait appelé sur Linux, nous ne devons pas être en train d'hériter. Sur Windows, il y a plus de choses:

def dump(obj, file, protocol=None):
    ForkingPickler(file, protocol).dump(obj)

class Popen(object):
    '''
    Start a subprocess to run the code of a process object
    '''
    _tls = thread._local()

    def __init__(self, process_obj):
        ...
        # send information to child
        prep_data = get_preparation_data(process_obj._name)
        to_child = os.fdopen(wfd, 'wb')
        Popen._tls.process_handle = int(hp)
        try:
            dump(prep_data, to_child, HIGHEST_PROTOCOL)
            dump(process_obj, to_child, HIGHEST_PROTOCOL)
        finally:
            del Popen._tls.process_handle
            to_child.close()


    @staticmethod
    def thread_is_spawning():
        return getattr(Popen._tls, 'process_handle', None) is not None

Ici thread_is_spawning retourne True si le Popen._tls l'objet a un process_handle l'attribut. Nous pouvons voir que le process_handle l'attribut est créé dans __init__, puis les données que nous voulons héritées sont passées du parent à l'enfant en utilisant dump, l'attribut est supprimé. Donc thread_is_spawning uniquement True cours __init__. Selon ce fil de liste de diffusion python-ideas, c'est en fait une limitation artificielle pour simuler le même comportement que os.fork sur Linux. En fait, Windows charge la transmission de l' Lock à tout moment, parce que DuplicateHandle peut être exécuté à tout moment.

Tout ce qui précède s'applique à l' Queue objet parce qu'il utilise Lock en interne.

je dirais que héritant Lock objets est préférable à l'utilisation d'un Manager.Lock(), parce que lorsque vous utilisez un Manager.Lock, chaque appel que vous faites à l' Lock doit être envoyé à Manager processus, qui va être beaucoup plus lent que d'utiliser un shared Lock qui vit à l'intérieur du processus appelant. Les deux approches sont parfaitement valables.

Enfin, il est possible de passer un Lock tous les membres d'un Pool sans l'aide d'un Manager, en utilisant le initializer/initargs mots-clés arguments:

lock = None
def initialize_lock(l):
   global lock
   lock = l

def scenario_1_pool_no_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITHOUT a Manager for the lock and queue.

    """
    lock = mp.Lock()
    mypool = mp.Pool(ncores, initializer=initialize_lock, initargs=(lock,))
    queue = mp.Queue()

    iterator = make_iterator(args, queue)

    mypool.imap(jobfunc, iterator) # Don't pass lock. It has to be used as a global in the child. (This means `jobfunc` would need to be re-written slightly.

    mypool.close()
    mypool.join()

return read_queue(queue)

cela fonctionne parce que les arguments passés à initargs passer à la __init__ méthode de l' Process objets qui s'exécutent à l'intérieur de l' Pool, de sorte qu'ils finissent par être héréditaire, plutôt que marinés.

28
répondu dano 2015-02-22 23:11:53