Comprendre le Multiprocessing: gestion de la mémoire partagée, Serrures et Files d'attente en Python
Multitraitement est un outil puissant en python, et je veux le comprendre plus en profondeur. Je veux savoir quand utiliser Serrures et Files d'attente et quand utiliser un traitement multiple Gestionnaire les partager entre tous les processus.
j'ai proposé les scénarios de test suivants avec quatre conditions différentes pour le multiprocessing:
utilisation d'une piscine et NO Gestionnaire
utilisation d'un pool et D'un Manager
utilisant des processus individuels et NO Gestionnaire
utilisation de processus individuels et D'un gestionnaire
Le Travail
toutes les conditions exécutent une fonction de travail the_job
. the_job
consiste en une impression qui est sécurisée par une serrure. En outre, l'entrée de la fonction est simplement mis dans un file d'attente (pour voir si elle peut être récupérée à partir de la file d'attente). Cette entrée est tout simplement un index idx
range(10)
créé dans le script principal, appelé start_scenario
(en bas).
def the_job(args):
"""The job for multiprocessing.
Prints some stuff secured by a lock and
finally puts the input into a queue.
"""
idx = args[0]
lock = args[1]
queue=args[2]
lock.acquire()
print 'I'
print 'was '
print 'here '
print '!!!!'
print '1111'
print 'einhundertelfzigelfn'
who= ' By run %d n' % idx
print who
lock.release()
queue.put(idx)
Le succès d'une condition est définie comme parfaitement rappelant l'entrée
de la file d'attente, Voir la fonction read_queue
au fond.
Les Conditions
les conditions 1 et 2 sont assez explicites. La Condition 1 implique la création d'une serrure et d'une file d'attente, et le passage ces à un processus de piscine:
def scenario_1_pool_no_manager(jobfunc, args, ncores):
"""Runs a pool of processes WITHOUT a Manager for the lock and queue.
FAILS!
"""
mypool = mp.Pool(ncores)
lock = mp.Lock()
queue = mp.Queue()
iterator = make_iterator(args, lock, queue)
mypool.imap(jobfunc, iterator)
mypool.close()
mypool.join()
return read_queue(queue)
(La fonction d'assistance make_iterator
est donnée au bas de ce post.)
Conditions 1 échoue avec RuntimeError: Lock objects should only be shared between processes through inheritance
.
la Condition 2 est assez similaire, mais maintenant la serrure et la file d'attente sont sous la supervision d'un manager:
def scenario_2_pool_manager(jobfunc, args, ncores):
"""Runs a pool of processes WITH a Manager for the lock and queue.
SUCCESSFUL!
"""
mypool = mp.Pool(ncores)
lock = mp.Manager().Lock()
queue = mp.Manager().Queue()
iterator = make_iterator(args, lock, queue)
mypool.imap(jobfunc, iterator)
mypool.close()
mypool.join()
return read_queue(queue)
en condition 3, de nouveaux processus sont lancés manuellement, et la serrure et la file d'attente sont créées sans gestionnaire:
def scenario_3_single_processes_no_manager(jobfunc, args, ncores):
"""Runs an individual process for every task WITHOUT a Manager,
SUCCESSFUL!
"""
lock = mp.Lock()
queue = mp.Queue()
iterator = make_iterator(args, lock, queue)
do_job_single_processes(jobfunc, iterator, ncores)
return read_queue(queue)
la Condition 4 est similaire mais utilise maintenant une gestionnaire:
def scenario_4_single_processes_manager(jobfunc, args, ncores):
"""Runs an individual process for every task WITH a Manager,
SUCCESSFUL!
"""
lock = mp.Manager().Lock()
queue = mp.Manager().Queue()
iterator = make_iterator(args, lock, queue)
do_job_single_processes(jobfunc, iterator, ncores)
return read_queue(queue)
dans les deux conditions-3 et 4-je commence un nouveau
pour chacune des 10 tâches de the_job
avec au plus nercores processus
d'exploitation au même moment. Ceci est réalisé avec la fonction d'aide suivante:
def do_job_single_processes(jobfunc, iterator, ncores):
"""Runs a job function by starting individual processes for every task.
At most `ncores` processes operate at the same time
:param jobfunc: Job to do
:param iterator:
Iterator over different parameter settings,
contains a lock and a queue
:param ncores:
Number of processes operating at the same time
"""
keep_running=True
process_dict = {} # Dict containing all subprocees
while len(process_dict)>0 or keep_running:
terminated_procs_pids = []
# First check if some processes did finish their job
for pid, proc in process_dict.iteritems():
# Remember the terminated processes
if not proc.is_alive():
terminated_procs_pids.append(pid)
# And delete these from the process dict
for terminated_proc in terminated_procs_pids:
process_dict.pop(terminated_proc)
# If we have less active processes than ncores and there is still
# a job to do, add another process
if len(process_dict) < ncores and keep_running:
try:
task = iterator.next()
proc = mp.Process(target=jobfunc,
args=(task,))
proc.start()
process_dict[proc.pid]=proc
except StopIteration:
# All tasks have been started
keep_running=False
time.sleep(0.1)
Le Résultat
seule la condition 1 échoue (RuntimeError: Lock objects should only be shared between processes through inheritance
) alors que les 3 autres conditions sont réussies. J'ai essayer d'envelopper ma tête autour de ce résultat.
Pourquoi la piscine besoin de partager un verrouillage et une file d'attente entre tous les processus mais pas les processus individuels de la condition 3?
ce que je sais c'est que pour les conditions de pool (1 et 2) Toutes les données des itérateurs sont passées par pickling, alors que dans les conditions de processus simples (3 et 4) toutes les données des itérateurs sont passées par héritage du processus principal (j'utilise Linux).
Je suppose que jusqu'à ce que la mémoire soit changée de l'intérieur d'un processus enfant, le même souvenir que le processus parental utilise est accessible (copy-on-write). Mais dès que l'on dit lock.acquire()
, cela devrait être changé et les processus enfants utilisent des serrures différentes placées ailleurs dans la mémoire, n'est-ce pas? Comment un enfant de savoir qu'un frère a activé un verrou qui n'est pas partagé par l'intermédiaire d'un gestionnaire?
enfin, un peu apparenté est ma question à quel point les conditions 3 et 4 sont différentes. Les deux ayant chacun des processus, mais ils diffèrent dans l'utilisation d'un gestionnaire. Sont considérés comme des valide code? Ou doit-on éviter d'utiliser un gestionnaire si il n'ya vraiment pas besoin d'un?
Texte Complet
pour ceux qui veulent simplement copier et coller tout pour exécuter le code, Voici le script complet:
__author__ = 'Me and myself'
import multiprocessing as mp
import time
def the_job(args):
"""The job for multiprocessing.
Prints some stuff secured by a lock and
finally puts the input into a queue.
"""
idx = args[0]
lock = args[1]
queue=args[2]
lock.acquire()
print 'I'
print 'was '
print 'here '
print '!!!!'
print '1111'
print 'einhundertelfzigelfn'
who= ' By run %d n' % idx
print who
lock.release()
queue.put(idx)
def read_queue(queue):
"""Turns a qeue into a normal python list."""
results = []
while not queue.empty():
result = queue.get()
results.append(result)
return results
def make_iterator(args, lock, queue):
"""Makes an iterator over args and passes the lock an queue to each element."""
return ((arg, lock, queue) for arg in args)
def start_scenario(scenario_number = 1):
"""Starts one of four multiprocessing scenarios.
:param scenario_number: Index of scenario, 1 to 4
"""
args = range(10)
ncores = 3
if scenario_number==1:
result = scenario_1_pool_no_manager(the_job, args, ncores)
elif scenario_number==2:
result = scenario_2_pool_manager(the_job, args, ncores)
elif scenario_number==3:
result = scenario_3_single_processes_no_manager(the_job, args, ncores)
elif scenario_number==4:
result = scenario_4_single_processes_manager(the_job, args, ncores)
if result != args:
print 'Scenario %d fails: %s != %s' % (scenario_number, args, result)
else:
print 'Scenario %d successful!' % scenario_number
def scenario_1_pool_no_manager(jobfunc, args, ncores):
"""Runs a pool of processes WITHOUT a Manager for the lock and queue.
FAILS!
"""
mypool = mp.Pool(ncores)
lock = mp.Lock()
queue = mp.Queue()
iterator = make_iterator(args, lock, queue)
mypool.map(jobfunc, iterator)
mypool.close()
mypool.join()
return read_queue(queue)
def scenario_2_pool_manager(jobfunc, args, ncores):
"""Runs a pool of processes WITH a Manager for the lock and queue.
SUCCESSFUL!
"""
mypool = mp.Pool(ncores)
lock = mp.Manager().Lock()
queue = mp.Manager().Queue()
iterator = make_iterator(args, lock, queue)
mypool.map(jobfunc, iterator)
mypool.close()
mypool.join()
return read_queue(queue)
def scenario_3_single_processes_no_manager(jobfunc, args, ncores):
"""Runs an individual process for every task WITHOUT a Manager,
SUCCESSFUL!
"""
lock = mp.Lock()
queue = mp.Queue()
iterator = make_iterator(args, lock, queue)
do_job_single_processes(jobfunc, iterator, ncores)
return read_queue(queue)
def scenario_4_single_processes_manager(jobfunc, args, ncores):
"""Runs an individual process for every task WITH a Manager,
SUCCESSFUL!
"""
lock = mp.Manager().Lock()
queue = mp.Manager().Queue()
iterator = make_iterator(args, lock, queue)
do_job_single_processes(jobfunc, iterator, ncores)
return read_queue(queue)
def do_job_single_processes(jobfunc, iterator, ncores):
"""Runs a job function by starting individual processes for every task.
At most `ncores` processes operate at the same time
:param jobfunc: Job to do
:param iterator:
Iterator over different parameter settings,
contains a lock and a queue
:param ncores:
Number of processes operating at the same time
"""
keep_running=True
process_dict = {} # Dict containing all subprocees
while len(process_dict)>0 or keep_running:
terminated_procs_pids = []
# First check if some processes did finish their job
for pid, proc in process_dict.iteritems():
# Remember the terminated processes
if not proc.is_alive():
terminated_procs_pids.append(pid)
# And delete these from the process dict
for terminated_proc in terminated_procs_pids:
process_dict.pop(terminated_proc)
# If we have less active processes than ncores and there is still
# a job to do, add another process
if len(process_dict) < ncores and keep_running:
try:
task = iterator.next()
proc = mp.Process(target=jobfunc,
args=(task,))
proc.start()
process_dict[proc.pid]=proc
except StopIteration:
# All tasks have been started
keep_running=False
time.sleep(0.1)
def main():
"""Runs 1 out of 4 different multiprocessing scenarios"""
start_scenario(1)
if __name__ == '__main__':
main()
1 réponses
multiprocessing.Lock
est implémenté en utilisant un objet sémaphore fourni par L'OS. Sous Linux, L'enfant hérite d'une poignée au sémaphore du parent via os.fork
. Ce n'est pas une copie du Sémaphore; c'est en fait hériter le même handle que le parent a, de la même façon que les descripteurs de fichiers peuvent être hérités. Windows, par contre, ne supporte pas os.fork
, de sorte qu'il doit décaper le Lock
. Il le fait en créant une poignée dupliquée sur Le Sémaphore de Windows utilisé en interne par le multiprocessing.Lock
l'objet, à l'aide de Windows DuplicateHandle
API, qui stipule:
la poignée dupliquée fait référence au même objet que la poignée originale. Par conséquent, tout changement apporté à l'objet est reflété à la fois poignées
DuplicateHandle
API vous permet de donner la propriété de la poignée dupliquée au processus enfant, de sorte que le processus enfant peut réellement l'utiliser après l'avoir déballé. En créant une poignée dupliquée appartenant à l'enfant, vous peut effectivement "partager" le verrou de l'objet.
Voici l'objet sémaphore dans multiprocessing/synchronize.py
class SemLock(object):
def __init__(self, kind, value, maxvalue):
sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
debug('created semlock with handle %s' % sl.handle)
self._make_methods()
if sys.platform != 'win32':
def _after_fork(obj):
obj._semlock._after_fork()
register_after_fork(self, _after_fork)
def _make_methods(self):
self.acquire = self._semlock.acquire
self.release = self._semlock.release
self.__enter__ = self._semlock.__enter__
self.__exit__ = self._semlock.__exit__
def __getstate__(self): # This is called when you try to pickle the `Lock`.
assert_spawning(self)
sl = self._semlock
return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)
def __setstate__(self, state): # This is called when unpickling a `Lock`
self._semlock = _multiprocessing.SemLock._rebuild(*state)
debug('recreated blocker with handle %r' % state[0])
self._make_methods()
Notez le assert_spawning
appel __getstate__
, qui est appelé lors du décapage de l'objet. Voici comment cela est mis en place:
#
# Check that the current thread is spawning a child process
#
def assert_spawning(self):
if not Popen.thread_is_spawning():
raise RuntimeError(
'%s objects should only be shared between processes'
' through inheritance' % type(self).__name__
)
cette fonction est celle qui s'assure que vous "héritez" du Lock
, en appelant thread_is_spawning
. Sur Linux, cette méthode retourne juste False
:
@staticmethod
def thread_is_spawning():
return False
C'est parce que Linux n'a pas besoin de hériter Lock
, si __getstate__
est en fait appelé sur Linux, nous ne devons pas être en train d'hériter. Sur Windows, il y a plus de choses:
def dump(obj, file, protocol=None):
ForkingPickler(file, protocol).dump(obj)
class Popen(object):
'''
Start a subprocess to run the code of a process object
'''
_tls = thread._local()
def __init__(self, process_obj):
...
# send information to child
prep_data = get_preparation_data(process_obj._name)
to_child = os.fdopen(wfd, 'wb')
Popen._tls.process_handle = int(hp)
try:
dump(prep_data, to_child, HIGHEST_PROTOCOL)
dump(process_obj, to_child, HIGHEST_PROTOCOL)
finally:
del Popen._tls.process_handle
to_child.close()
@staticmethod
def thread_is_spawning():
return getattr(Popen._tls, 'process_handle', None) is not None
Ici thread_is_spawning
retourne True
si le Popen._tls
l'objet a un process_handle
l'attribut. Nous pouvons voir que le process_handle
l'attribut est créé dans __init__
, puis les données que nous voulons héritées sont passées du parent à l'enfant en utilisant dump
, l'attribut est supprimé. Donc thread_is_spawning
uniquement True
cours __init__
. Selon ce fil de liste de diffusion python-ideas, c'est en fait une limitation artificielle pour simuler le même comportement que os.fork
sur Linux. En fait, Windows charge la transmission de l' Lock
à tout moment, parce que DuplicateHandle
peut être exécuté à tout moment.
Tout ce qui précède s'applique à l' Queue
objet parce qu'il utilise Lock
en interne.
je dirais que héritant Lock
objets est préférable à l'utilisation d'un Manager.Lock()
, parce que lorsque vous utilisez un Manager.Lock
, chaque appel que vous faites à l' Lock
doit être envoyé à Manager
processus, qui va être beaucoup plus lent que d'utiliser un shared Lock
qui vit à l'intérieur du processus appelant. Les deux approches sont parfaitement valables.
Enfin, il est possible de passer un Lock
tous les membres d'un Pool
sans l'aide d'un Manager
, en utilisant le initializer
/initargs
mots-clés arguments:
lock = None
def initialize_lock(l):
global lock
lock = l
def scenario_1_pool_no_manager(jobfunc, args, ncores):
"""Runs a pool of processes WITHOUT a Manager for the lock and queue.
"""
lock = mp.Lock()
mypool = mp.Pool(ncores, initializer=initialize_lock, initargs=(lock,))
queue = mp.Queue()
iterator = make_iterator(args, queue)
mypool.imap(jobfunc, iterator) # Don't pass lock. It has to be used as a global in the child. (This means `jobfunc` would need to be re-written slightly.
mypool.close()
mypool.join()
return read_queue(queue)
cela fonctionne parce que les arguments passés à initargs
passer à la __init__
méthode de l' Process
objets qui s'exécutent à l'intérieur de l' Pool
, de sorte qu'ils finissent par être héréditaire, plutôt que marinés.