Partager un objet complexe entre des processus Python?
j'ai un objet Python assez complexe que je dois partager entre plusieurs processus. Je lance ces processus en utilisant multiprocessing.Process
. Quand je partage un objet avec multiprocessing.Queue
et multiprocessing.Pipe
, ils sont très bien partagés. Mais quand j'essaie de partager un objet avec d'autres objets non-multi-processing-module, il semble que Python bifurque ces objets. Est-ce vrai?
j'ai essayé le multiprocessing.Valeur. Mais je ne suis pas sûr de ce que devrait être le type? Mon objet la classe s'appelle MyClass. Mais quand j'essaie multiprocess.Value(MyClass, instance)
, il échoue avec:
TypeError: this type has no size
une idée de ce qui se passe?
5 réponses
vous pouvez le faire en utilisant les classes Multiprocessing" Manager " de Python et une classe proxy que vous définissez. De la part de Python docs: http://docs.python.org/library/multiprocessing.html#proxy-objects
ce que vous voulez faire est de définir une classe proxy pour votre objet personnalisé, puis de partager l'objet en utilisant un" gestionnaire à distance "-- regardez les exemples dans la même page doc liée pour" gestionnaire à distance " où les docs montrent comment partager une file d'attente à distance. Vous allez faire la même chose, mais votre appel à your_manager_instance.register () inclura votre classe de proxy personnalisée dans sa liste d'arguments.
de cette façon, vous configurez un serveur pour partager l'objet personnalisé avec un proxy personnalisé. Vos clients ont besoin d'accéder au serveur (encore une fois, voir les excellents exemples de documentation sur la façon de configurer l'accès client/serveur à une file d'attente distante, mais au lieu de partager une file d'attente, vous partagez l'accès à votre classe spécifique).
après beaucoup de recherches et d'essais, j'ai trouvé" Manager "faire ce travail dans un non-complexe niveau objet.
le code ci-dessous indique que l'objet inst
est partagé entre les processus, ce qui signifie que la propriété var
de inst
est changée à l'extérieur lorsque le processus enfant le change.
from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager
class SimpleClass(object):
def __init__(self):
self.var = 0
def set(self, value):
self.var = value
def get(self):
return self.var
def change_obj_value(obj):
obj.set(100)
if __name__ == '__main__':
BaseManager.register('SimpleClass', SimpleClass)
manager = BaseManager()
manager.start()
inst = manager.SimpleClass()
p = Process(target=change_obj_value, args=[inst])
p.start()
p.join()
print inst # <__main__.SimpleClass object at 0x10cf82350>
print inst.get() # 100
ok, le code ci-dessus est assez si vous avez seulement besoin de partager objets simples .
pourquoi pas complexe? Parce que il peut échouer si votre objet est emboîté (objet à l'intérieur de l'objet):
from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager
class GetSetter(object):
def __init__(self):
self.var = None
def set(self, value):
self.var = value
def get(self):
return self.var
class ChildClass(GetSetter):
pass
class ParentClass(GetSetter):
def __init__(self):
self.child = ChildClass()
GetSetter.__init__(self)
def getChild(self):
return self.child
def change_obj_value(obj):
obj.set(100)
obj.getChild().set(100)
if __name__ == '__main__':
BaseManager.register('ParentClass', ParentClass)
manager = BaseManager()
manager.start()
inst2 = manager.ParentClass()
p2 = Process(target=change_obj_value, args=[inst2])
p2.start()
p2.join()
print inst2 # <__main__.ParentClass object at 0x10cf82350>
print inst2.getChild() # <__main__.ChildClass object at 0x10cf6dc50>
print inst2.get() # 100
#good!
print inst2.getChild().get() # None
#bad! you need to register child class too but there's almost no way to do it
#even if you did register child class, you may get PicklingError :)
je pense que la principale raison de ce comportement est parce que Manager
est juste une construction candybar sur le dessus des outils de communication de bas niveau comme pipe/queue.
donc, cette approche est pas bien recommandé pour le cas de traitement multiple. C'est toujours mieux si vous pouvez utiliser outils de bas niveau comme lock / semaphore/pipe/queue ou outils de haut niveau comme Redis queue ou Redis publish / subscribe pour cas d'utilisation compliquée (seulement ma recommandation lol).
voici un paquet python que j'ai fait juste pour ça (partager des objets complexes entre des processus).
git: https://github.com/dRoje/pipe-proxy
L'idée est de créer un proxy de votre objet et de le transmettre à un processus. Ensuite, vous utilisez le proxy comme vous avez une référence à l'objet d'origine. Bien que vous ne pouvez utiliser que des appels de méthode, donc l'accès aux variables d'objet se fait avec des setters et des getters lancés.
disons que nous avons un objet appelé "exemple’, la création de proxy et de l'auditeur proxy est facile:
from pipeproxy import proxy
example = Example()
exampleProxy, exampleProxyListener = proxy.createProxy(example)
maintenant vous envoyez le mandataire à un autre processus.
p = Process(target=someMethod, args=(exampleProxy,)) p.start()
utilisez-le dans l'autre processus comme vous utiliseriez l'objet original (exemple):
def someMethod(exampleProxy):
...
exampleProxy.originalExampleMethod()
...
mais vous devez l'écouter dans le processus principal:
exampleProxyListener.listen()
lire plus et trouver des exemples ici:
http://matkodjipalo.com/index.php/2017/11/12/proxy-solution-python-multiprocessing /
j'ai essayé D'utiliser BaseManager et d'Enregistrer ma classe personnalisée pour la rendre heureuse, et obtenir le problème de classe imbriquée comme Tom avait mentionné ci-dessus.
je pense que la raison principale n'est pas pertinente pour la classe imbriquée comme dit, mais le mécanisme de communication que python prendre en bas niveau. La raison en est que python utilise un mécanisme de communication similaire à celui d'une socket pour synchroniser la modification de la classe personnalisée au sein d'un processus serveur de bas niveau. Je pense que c' encapsuler certaines méthodes rpc, le rendre juste transparent à l'utilisateur comme s'ils appelaient les méthodes locales d'un objet de classe imbriqué.
Ainsi, lorsque vous voulez modifier, récupérer vos objets auto-définis ou certains objets tiers, vous devriez définir certaines interfaces dans vos processus pour communiquer avec elle plutôt que directement obtenir ou définir des valeurs.
Pourtant, lors de l'exploitation des objets multi-imbriqués dans les objets imbriqués, on peut ignorer les problèmes mentionnés ci-dessus, tout comme ce que vous faites dans votre routine commune parce que vos objets imbriqués dans la classe enregistrée ne sont plus des objets proxy, sur lesquels l'opération ne passera pas à nouveau la routine de communication socket-alike et est localisé.
voici le code que j'ai écrit pour résoudre le problème.
from multiprocessing import Process, Manager, Lock
from multiprocessing.managers import BaseManager
import numpy as np
class NestedObj(object):
def __init__(self):
self.val = 1
class CustomObj(object):
def __init__(self, numpy_obj):
self.numpy_obj = numpy_obj
self.nested_obj = NestedObj()
def set_value(self, p, q, v):
self.numpy_obj[p, q] = v
def get_obj(self):
return self.numpy_obj
def get_nested_obj(self):
return self.nested_obj.val
class CustomProcess(Process):
def __init__(self, obj, p, q, v):
super(CustomProcess, self).__init__()
self.obj = obj
self.index = p, q
self.v = v
def run(self):
self.obj.set_value(*self.index, self.v)
if __name__=="__main__":
BaseManager.register('CustomObj', CustomObj)
manager = BaseManager()
manager.start()
data = [[0 for x in range(10)] for y in range(10)]
matrix = np.matrix(data)
custom_obj = manager.CustomObj(matrix)
print(custom_obj.get_obj())
process_list = []
for p in range(10):
for q in range(10):
proc = CustomProcess(custom_obj, p, q, 10*p+q)
process_list.append(proc)
for x in range(100):
process_list[x].start()
for x in range(100):
process_list[x].join()
print(custom_obj.get_obj())
print(custom_obj.get_nested_obj())
pour sauver quelques maux de tête avec des ressources partagées, vous pouvez essayer de recueillir des données qui ont besoin d'accéder à une ressource singleton dans une déclaration de retour de la fonction qui est cartographiée par exemple pool.imap_unordered
et ensuite le traiter dans une boucle qui récupère les résultats partiels:
for result in in pool.imap_unordered(process_function, iterable_data):
do_something(result)
si ce n'est pas beaucoup de données qui est retourné, alors il pourrait ne pas y avoir beaucoup de frais généraux à faire cela.