Ne peut pas pickle en utilisant multiprocessing Pool.cartographie()

j'essaie d'utiliser la fonction multiprocessing s Pool.map() pour répartir le travail simultanément. Quand j'utilise le code suivant, ça marche très bien:

import multiprocessing

def f(x):
    return x*x

def go():
    pool = multiprocessing.Pool(processes=4)        
    print pool.map(f, range(10))


if __name__== '__main__' :
    go()

cependant, quand je l'utilise dans une approche plus orientée objet, il ne fonctionne pas. Le message d'erreur qu'il donne est:

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed

cela se produit lorsque le suivant est mon programme principal:

import someClass

if __name__== '__main__' :
    sc = someClass.someClass()
    sc.go()

et la suivante est ma classe someClass :

import multiprocessing

class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(self.f, range(10))

Quelqu'un sait quel pourrait être le problème, ou un moyen facile de le contourner?

182
demandé sur martineau 2009-11-30 01:08:35

11 réponses

le problème est que le multiprocessing doit brosser les choses à les lancer parmi les processus, et les méthodes liées ne sont pas pickable. La solution (que vous le considériez " facile "ou non;-) est d'ajouter l'infrastructure à votre programme pour permettre à de telles méthodes d'être récupérées, en l'enregistrant avec la méthode de bibliothèque standard copy_reg .

par exemple, la contribution de Steven Bethard à ce fil (vers la fin de la thread) montre une approche parfaitement réalisable pour permettre la méthode décapage / unpickling via copy_reg .

104
répondu Alex Martelli 2017-02-28 14:31:05

toutes ces solutions sont moches parce que le multiprocessing et le décapage est cassé et limité à moins que vous sautiez en dehors de la bibliothèque standard.

si vous utilisez une fourchette de multiprocessing appelée pathos.multiprocesssing , vous pouvez utiliser directement les méthodes classes et classes dans les fonctions map de multiprocessing. C'est parce que dill est utilisé au lieu de pickle ou cPickle , et dill peut sérialiser presque tout en python.

voulu faire, en premier lieu, et vous pouvez le faire à partir de l'interprète, si vous voulez.

>>> import pathos.pools as pp
>>> class someClass(object):
...   def __init__(self):
...     pass
...   def f(self, x):
...     return x*x
...   def go(self):
...     pool = pp.ProcessPool(4)
...     print pool.map(self.f, range(10))
... 
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> 

obtenir le code ici: https://github.com/uqfoundation/pathos

62
répondu Mike McKerns 2017-07-07 16:39:20

vous pouvez également définir une méthode __call__() à l'intérieur de votre someClass() , qui appelle someClass.go() et passer ensuite une instance de someClass() au pool. Cet objet est pickleable et il fonctionne bien (pour moi)...

31
répondu dorvak 2011-08-31 22:50:37

Certaines limites bien que Steven Bethard la solution :

quand vous enregistrez votre méthode de classe comme une fonction, le destructeur de votre classe est étonnamment appelé chaque fois que votre traitement de méthode est terminé. Donc, si vous avez une instance de votre classe qui appelle n fois sa méthode, les membres peuvent disparaître entre 2 cycles et vous pouvez obtenir un message malloc: *** error for object 0x...: pointer being freed was not allocated (par exemple ouvrir le fichier membre) ou pure virtual method called, terminate called without an active exception (ce qui signifie que la durée de vie d'un objet membre que j'ai utilisé était plus court que ce que je pensais). J'ai eu ça en traitant avec n plus grand que la taille de la piscine. Voici un bref exemple:

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

# --------- see Stenven's solution above -------------
from copy_reg import pickle
from types import MethodType

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)


class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multi-processing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __del__(self):
        print "... Destructor"

    def process_obj(self, index):
        print "object %d" % index
        return "results"

pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)

sortie:

Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
... Destructor

la méthode __call__ n'est pas aussi équivalente, parce que [aucun,...] sont lus à partir des résultats:

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multiprocessing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __call__(self, i):
        self.process_obj(i)

    def __del__(self):
        print "... Destructor"

    def process_obj(self, i):
        print "obj %d" % i
        return "result"

Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once), 
# **and** results are empty !

Si aucune des deux méthodes n'est satisfaisante...

18
répondu Eric H. 2011-09-12 08:26:49

il y a un autre raccourci que vous pouvez utiliser, bien qu'il puisse être inefficace en fonction de ce qui est dans vos instances de classe.

comme tout le monde l'a dit, le problème est que le code multiprocessing doit pickle les choses qu'il envoie aux sous-processus qu'il a commencé, et le pickler ne fait pas instance-méthodes.

cependant, au lieu d'envoyer la méthode instance, vous pouvez envoyer la classe instance réelle, plus le nom de la fonction à appeler, à une fonction ordinaire qui utilise ensuite getattr pour appeler la méthode instance, créant ainsi la méthode liée dans le sous-processus Pool . Ceci est similaire à la définition d'un __call__ méthode sauf que vous pouvez appeler plus d'une fonction membre.

Stealing @EricH.'s code de sa réponse et l'annotant un peu (je l'ai retapé d'où tous les changements de nom et Tels, pour une raison quelconque cela a semblé plus facile que Couper-Coller : -)) pour l'illustration de toute la magie:

import multiprocessing
import os

def call_it(instance, name, args=(), kwargs=None):
    "indirect caller for instance methods and multiprocessing"
    if kwargs is None:
        kwargs = {}
    return getattr(instance, name)(*args, **kwargs)

class Klass(object):
    def __init__(self, nobj, workers=multiprocessing.cpu_count()):
        print "Constructor (in pid=%d)..." % os.getpid()
        self.count = 1
        pool = multiprocessing.Pool(processes = workers)
        async_results = [pool.apply_async(call_it,
            args = (self, 'process_obj', (i,))) for i in range(nobj)]
        pool.close()
        map(multiprocessing.pool.ApplyResult.wait, async_results)
        lst_results = [r.get() for r in async_results]
        print lst_results

    def __del__(self):
        self.count -= 1
        print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)

    def process_obj(self, index):
        print "object %d" % index
        return "results"

Klass(nobj=8, workers=3)

la sortie montre que, en effet, le constructeur est appelé une fois (dans le pid original) et le destructeur est appelé 9 fois (une fois pour chaque copie faite = 2 ou 3 fois par pool-worker-process si nécessaire, plus une fois dans le processus original). C'est souvent OK, comme dans ce cas, puisque le pickler par défaut fait une copie de l'instance entière et (semi-) la ré-popule secrètement-dans ce cas, en faisant:

obj = object.__new__(Klass)
obj.__dict__.update({'count':1})

-c'est pourquoi, même si le destructeur est appelé huit fois dans les trois processus ouvriers, il compte de 1 à 0 à chaque fois-mais bien sûr, vous pouvez encore avoir des problèmes de cette façon. Si nécessaire, vous pouvez fournir votre propre __setstate__ :

    def __setstate__(self, adict):
        self.count = adict['count']

dans ce cas, par exemple.

11
répondu torek 2012-04-18 20:01:54

vous pouvez également définir une méthode __call__() à l'intérieur de votre someClass() , qui appelle someClass.go() et passer ensuite une instance de someClass() au pool. Cet objet est pickleable et il fonctionne bien (pour moi)...

class someClass(object):
   def __init__(self):
       pass
   def f(self, x):
       return x*x

   def go(self):
      p = Pool(4)
      sc = p.map(self, range(4))
      print sc

   def __call__(self, x):   
     return self.f(x)

sc = someClass()
sc.go()
5
répondu parisjohn 2017-01-31 15:33:15

une solution potentiellement triviale consiste à passer à l'utilisation de multiprocessing.dummy . C'est une implémentation basée sur le thread de l'interface multiprocessing qui ne semble pas avoir ce problème en Python 2.7. Je n'ai pas beaucoup d'expérience ici, mais ce changement rapide d'importation m'a permis d'appeler apply_async sur une méthode de classe.

quelques bonnes ressources sur multiprocessing.dummy :

https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.dummy

http://chriskiehl.com/article/parallelism-in-one-line/

1
répondu David Parks 2016-12-20 05:08:35

dans ce cas simple ,où someClass.f n'hérite pas de données de la classe et n'attache rien à la classe, une solution possible serait de séparer f , de sorte qu'il peut être décapité:

import multiprocessing


def f(x):
    return x*x


class someClass(object):
    def __init__(self):
        pass

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(f, range(10))
0
répondu mhh 2018-03-23 18:14:11

pourquoi ne pas utiliser des fonctions séparées?

def func(*args, **kwargs):
    return inst.method(args, kwargs)

print pool.map(func, arr)
0
répondu 0script0 2018-05-17 14:33:11

mise à jour: à partir du jour de cette Écriture, les tuples de noms sont sélectionnables (à partir de python 2.7)

le problème ici est que les processus enfants ne sont pas en mesure d'importer la classe de l'objet-dans ce cas, la classe P -, dans le cas d'un projet multi-modèle la classe P devrait être important partout où le processus enfant obtenir utilisé""

une solution rapide est de le rendre importable en l'affectant à globals ()

globals()["P"] = P
0
répondu rachid el kedmiri 2018-06-11 13:27:12

la solution de parisjohn ci-dessus fonctionne très bien avec moi. Plus le code est propre et facile à comprendre. Dans mon cas, il y a quelques fonctions à appeler en utilisant Pool, donc j'ai modifié le code de parisjohn un peu plus bas. J'ai fait appeler pour pouvoir appeler plusieurs fonctions, et les noms de fonction sont passés dans l'argument dict de go() :

from multiprocessing import Pool
class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def g(self, x):
        return x*x+1    

    def go(self):
        p = Pool(4)
        sc = p.map(self, [{"func": "f", "v": 1}, {"func": "g", "v": 2}])
        print sc

    def __call__(self, x):
        if x["func"]=="f":
            return self.f(x["v"])
        if x["func"]=="g":
            return self.g(x["v"])        

sc = someClass()
sc.go()
0
répondu neobot 2018-06-27 14:10:21