Ne peut pas pickle en utilisant multiprocessing Pool.cartographie()
j'essaie d'utiliser la fonction multiprocessing
s Pool.map()
pour répartir le travail simultanément. Quand j'utilise le code suivant, ça marche très bien:
import multiprocessing
def f(x):
return x*x
def go():
pool = multiprocessing.Pool(processes=4)
print pool.map(f, range(10))
if __name__== '__main__' :
go()
cependant, quand je l'utilise dans une approche plus orientée objet, il ne fonctionne pas. Le message d'erreur qu'il donne est:
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed
cela se produit lorsque le suivant est mon programme principal:
import someClass
if __name__== '__main__' :
sc = someClass.someClass()
sc.go()
et la suivante est ma classe someClass
:
import multiprocessing
class someClass(object):
def __init__(self):
pass
def f(self, x):
return x*x
def go(self):
pool = multiprocessing.Pool(processes=4)
print pool.map(self.f, range(10))
Quelqu'un sait quel pourrait être le problème, ou un moyen facile de le contourner?
11 réponses
le problème est que le multiprocessing doit brosser les choses à les lancer parmi les processus, et les méthodes liées ne sont pas pickable. La solution (que vous le considériez " facile "ou non;-) est d'ajouter l'infrastructure à votre programme pour permettre à de telles méthodes d'être récupérées, en l'enregistrant avec la méthode de bibliothèque standard copy_reg .
par exemple, la contribution de Steven Bethard à ce fil (vers la fin de la thread) montre une approche parfaitement réalisable pour permettre la méthode décapage / unpickling via copy_reg
.
toutes ces solutions sont moches parce que le multiprocessing et le décapage est cassé et limité à moins que vous sautiez en dehors de la bibliothèque standard.
si vous utilisez une fourchette de multiprocessing
appelée pathos.multiprocesssing
, vous pouvez utiliser directement les méthodes classes et classes dans les fonctions map
de multiprocessing. C'est parce que dill
est utilisé au lieu de pickle
ou cPickle
, et dill
peut sérialiser presque tout en python.
>>> import pathos.pools as pp
>>> class someClass(object):
... def __init__(self):
... pass
... def f(self, x):
... return x*x
... def go(self):
... pool = pp.ProcessPool(4)
... print pool.map(self.f, range(10))
...
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>>
obtenir le code ici: https://github.com/uqfoundation/pathos
vous pouvez également définir une méthode __call__()
à l'intérieur de votre someClass()
, qui appelle someClass.go()
et passer ensuite une instance de someClass()
au pool. Cet objet est pickleable et il fonctionne bien (pour moi)...
Certaines limites bien que Steven Bethard la solution :
quand vous enregistrez votre méthode de classe comme une fonction, le destructeur de votre classe est étonnamment appelé chaque fois que votre traitement de méthode est terminé. Donc, si vous avez une instance de votre classe qui appelle n fois sa méthode, les membres peuvent disparaître entre 2 cycles et vous pouvez obtenir un message malloc: *** error for object 0x...: pointer being freed was not allocated
(par exemple ouvrir le fichier membre) ou pure virtual method called,
terminate called without an active exception
(ce qui signifie que la durée de vie d'un objet membre que j'ai utilisé était plus court que ce que je pensais). J'ai eu ça en traitant avec n plus grand que la taille de la piscine. Voici un bref exemple:
from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult
# --------- see Stenven's solution above -------------
from copy_reg import pickle
from types import MethodType
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
class Myclass(object):
def __init__(self, nobj, workers=cpu_count()):
print "Constructor ..."
# multi-processing
pool = Pool(processes=workers)
async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
pool.close()
# waiting for all results
map(ApplyResult.wait, async_results)
lst_results=[r.get() for r in async_results]
print lst_results
def __del__(self):
print "... Destructor"
def process_obj(self, index):
print "object %d" % index
return "results"
pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)
sortie:
Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
... Destructor
la méthode __call__
n'est pas aussi équivalente, parce que [aucun,...] sont lus à partir des résultats:
from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult
class Myclass(object):
def __init__(self, nobj, workers=cpu_count()):
print "Constructor ..."
# multiprocessing
pool = Pool(processes=workers)
async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
pool.close()
# waiting for all results
map(ApplyResult.wait, async_results)
lst_results=[r.get() for r in async_results]
print lst_results
def __call__(self, i):
self.process_obj(i)
def __del__(self):
print "... Destructor"
def process_obj(self, i):
print "obj %d" % i
return "result"
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once),
# **and** results are empty !
Si aucune des deux méthodes n'est satisfaisante...
il y a un autre raccourci que vous pouvez utiliser, bien qu'il puisse être inefficace en fonction de ce qui est dans vos instances de classe.
comme tout le monde l'a dit, le problème est que le code multiprocessing
doit pickle les choses qu'il envoie aux sous-processus qu'il a commencé, et le pickler ne fait pas instance-méthodes.
cependant, au lieu d'envoyer la méthode instance, vous pouvez envoyer la classe instance réelle, plus le nom de la fonction à appeler, à une fonction ordinaire qui utilise ensuite getattr
pour appeler la méthode instance, créant ainsi la méthode liée dans le sous-processus Pool
. Ceci est similaire à la définition d'un __call__
méthode sauf que vous pouvez appeler plus d'une fonction membre.
Stealing @EricH.'s code de sa réponse et l'annotant un peu (je l'ai retapé d'où tous les changements de nom et Tels, pour une raison quelconque cela a semblé plus facile que Couper-Coller : -)) pour l'illustration de toute la magie:
import multiprocessing
import os
def call_it(instance, name, args=(), kwargs=None):
"indirect caller for instance methods and multiprocessing"
if kwargs is None:
kwargs = {}
return getattr(instance, name)(*args, **kwargs)
class Klass(object):
def __init__(self, nobj, workers=multiprocessing.cpu_count()):
print "Constructor (in pid=%d)..." % os.getpid()
self.count = 1
pool = multiprocessing.Pool(processes = workers)
async_results = [pool.apply_async(call_it,
args = (self, 'process_obj', (i,))) for i in range(nobj)]
pool.close()
map(multiprocessing.pool.ApplyResult.wait, async_results)
lst_results = [r.get() for r in async_results]
print lst_results
def __del__(self):
self.count -= 1
print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)
def process_obj(self, index):
print "object %d" % index
return "results"
Klass(nobj=8, workers=3)
la sortie montre que, en effet, le constructeur est appelé une fois (dans le pid original) et le destructeur est appelé 9 fois (une fois pour chaque copie faite = 2 ou 3 fois par pool-worker-process si nécessaire, plus une fois dans le processus original). C'est souvent OK, comme dans ce cas, puisque le pickler par défaut fait une copie de l'instance entière et (semi-) la ré-popule secrètement-dans ce cas, en faisant:
obj = object.__new__(Klass)
obj.__dict__.update({'count':1})
-c'est pourquoi, même si le destructeur est appelé huit fois dans les trois processus ouvriers, il compte de 1 à 0 à chaque fois-mais bien sûr, vous pouvez encore avoir des problèmes de cette façon. Si nécessaire, vous pouvez fournir votre propre __setstate__
:
def __setstate__(self, adict):
self.count = adict['count']
dans ce cas, par exemple.
vous pouvez également définir une méthode __call__()
à l'intérieur de votre someClass()
, qui appelle someClass.go()
et passer ensuite une instance de someClass()
au pool. Cet objet est pickleable et il fonctionne bien (pour moi)...
class someClass(object):
def __init__(self):
pass
def f(self, x):
return x*x
def go(self):
p = Pool(4)
sc = p.map(self, range(4))
print sc
def __call__(self, x):
return self.f(x)
sc = someClass()
sc.go()
une solution potentiellement triviale consiste à passer à l'utilisation de multiprocessing.dummy
. C'est une implémentation basée sur le thread de l'interface multiprocessing qui ne semble pas avoir ce problème en Python 2.7. Je n'ai pas beaucoup d'expérience ici, mais ce changement rapide d'importation m'a permis d'appeler apply_async sur une méthode de classe.
quelques bonnes ressources sur multiprocessing.dummy
:
https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.dummy
dans ce cas simple ,où someClass.f
n'hérite pas de données de la classe et n'attache rien à la classe, une solution possible serait de séparer f
, de sorte qu'il peut être décapité:
import multiprocessing
def f(x):
return x*x
class someClass(object):
def __init__(self):
pass
def go(self):
pool = multiprocessing.Pool(processes=4)
print pool.map(f, range(10))
pourquoi ne pas utiliser des fonctions séparées?
def func(*args, **kwargs):
return inst.method(args, kwargs)
print pool.map(func, arr)
mise à jour: à partir du jour de cette Écriture, les tuples de noms sont sélectionnables (à partir de python 2.7)
le problème ici est que les processus enfants ne sont pas en mesure d'importer la classe de l'objet-dans ce cas, la classe P -, dans le cas d'un projet multi-modèle la classe P devrait être important partout où le processus enfant obtenir utilisé""
une solution rapide est de le rendre importable en l'affectant à globals ()
globals()["P"] = P
la solution de parisjohn ci-dessus fonctionne très bien avec moi. Plus le code est propre et facile à comprendre. Dans mon cas, il y a quelques fonctions à appeler en utilisant Pool, donc j'ai modifié le code de parisjohn un peu plus bas. J'ai fait appeler pour pouvoir appeler plusieurs fonctions, et les noms de fonction sont passés dans l'argument dict de go()
:
from multiprocessing import Pool
class someClass(object):
def __init__(self):
pass
def f(self, x):
return x*x
def g(self, x):
return x*x+1
def go(self):
p = Pool(4)
sc = p.map(self, [{"func": "f", "v": 1}, {"func": "g", "v": 2}])
print sc
def __call__(self, x):
if x["func"]=="f":
return self.f(x["v"])
if x["func"]=="g":
return self.g(x["v"])
sc = someClass()
sc.go()