Objets mémoire partagée en multiprocesseur

supposons que j'ai un grand tableau numpy en mémoire, j'ai une fonction func qui prend dans ce tableau géant comme entrée (avec quelques autres paramètres). func avec différents paramètres peut être exécuté en parallèle. Par exemple:

def func(arr, param):
    # do stuff to arr, param

# build array arr

pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]

si j'utilise la bibliothèque multiprocessing, alors ce tableau géant sera copié plusieurs fois dans des processus différents.

y a-t-il un moyen de laisser différents processus partager le même tableau? Ce tableau objet est en lecture seule et ne sera jamais modifié.

Qu'est-ce qui est plus compliqué, si arr n'est pas un tableau, mais un objet Python arbitraire, y a-t-il un moyen de le partager?

[EDITED]

j'ai lu la réponse, mais je suis encore un peu confus. Puisque fork () est copy-on-write, nous ne devrions pas invoquer de coût supplémentaire lors de la création de nouveaux processus dans la bibliothèque multiprocessing de python. Mais le code suivant suggère qu'il y a une énorme au-dessus de la tête:

from multiprocessing import Pool, Manager
import numpy as np; 
import time

def f(arr):
    return len(arr)

t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;


pool = Pool(processes = 6)

t = time.time()
res = pool.apply_async(f, [arr,])
res.get()
print "multiprocessing overhead = ", time.time() - t;

sortie (et, soit dit en passant, le coût augmente avec la taille du tableau augmente, donc je soupçonne qu'Il ya encore des frais généraux liés à la copie de mémoire):

construct array =  0.0178790092468
multiprocessing overhead =  0.252444982529

Pourquoi y a-t-il un tel plafond, si nous n'avons pas copié le tableau? Et quelle partie de la mémoire partagée me sauve?

82
demandé sur martineau 2012-05-23 18:20:21

2 réponses

si vous utilisez un système d'exploitation qui utilise la sémantique" copy-on-write fork() (comme n'importe quel unix commun), alors tant que vous ne modifiez jamais votre structure de données, il sera disponible pour tous les processus enfants sans prendre la mémoire supplémentaire. Vous n'aurez pas à faire quoi que ce soit de spécial (sauf s'assurer absolument que vous ne modifiez pas l'objet).

Le plus efficace chose vous peut faire pour votre problème serait d' empaquetez votre tableau dans une structure de tableau efficace (en utilisant numpy ou array ), mettez ça dans une mémoire partagée, enveloppez-le de multiprocessing.Array , et passez ça à vos fonctions. cette réponse montre comment faire que .

Si vous voulez un inscriptible objet partagé, alors vous aurez besoin pour l'envelopper avec une sorte de synchronisation ou de verrouillage. multiprocessing fournit deux méthodes de faire ce : l'un utilisant la mémoire partagée (adapté pour les valeurs simples, tableaux, ou ctypes) ou un Manager proxy, où un processus détient la mémoire et un gestionnaire arbitre l'accès à celle-ci à partir d'autres processus (même sur un réseau).

l'approche Manager peut être utilisée avec des objets Python arbitraires, mais sera plus lente que l'équivalent en utilisant la mémoire partagée parce que les objets doivent être sérialisés/désérialisés et envoyés entre les processus.

il existe une richesse de bibliothèques de traitement en parallèle et des approches disponibles en Python . multiprocessing est une bibliothèque excellente et bien équilibrée, mais si vous avez des besoins spéciaux peut-être une des autres approches peut être mieux.

79
répondu Francis Avila 2017-05-23 12:18:11

j'ai rencontré le même problème et j'ai écrit une petite classe d'utilitaire de mémoire partagée pour l'contourner.

j'utilise le multiprocesseur.RawArray (lockfree), et aussi l'accès aux tableaux n'est pas du tout synchronisé (lockfree), faites attention à ne pas tirer sur vos propres pieds.

avec la solution j'obtiens des vitesses d'un facteur d'environ 3 sur un quad-core i7.

voici le code: N'hésitez pas à utiliser et l'améliorer, et merci de faire un rapport tout les bugs.

'''
Created on 14.05.2013

@author: martin
'''

import multiprocessing
import ctypes
import numpy as np

class SharedNumpyMemManagerError(Exception):
    pass

'''
Singleton Pattern
'''
class SharedNumpyMemManager:    

    _initSize = 1024

    _instance = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(SharedNumpyMemManager, cls).__new__(
                                cls, *args, **kwargs)
        return cls._instance        

    def __init__(self):
        self.lock = multiprocessing.Lock()
        self.cur = 0
        self.cnt = 0
        self.shared_arrays = [None] * SharedNumpyMemManager._initSize

    def __createArray(self, dimensions, ctype=ctypes.c_double):

        self.lock.acquire()

        # double size if necessary
        if (self.cnt >= len(self.shared_arrays)):
            self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)

        # next handle
        self.__getNextFreeHdl()        

        # create array in shared memory segment
        shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))

        # convert to numpy array vie ctypeslib
        self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)

        # do a reshape for correct dimensions            
        # Returns a masked array containing the same data, but with a new shape.
        # The result is a view on the original array
        self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)

        # update cnt
        self.cnt += 1

        self.lock.release()

        # return handle to the shared memory numpy array
        return self.cur

    def __getNextFreeHdl(self):
        orgCur = self.cur
        while self.shared_arrays[self.cur] is not None:
            self.cur = (self.cur + 1) % len(self.shared_arrays)
            if orgCur == self.cur:
                raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')

    def __freeArray(self, hdl):
        self.lock.acquire()
        # set reference to None
        if self.shared_arrays[hdl] is not None: # consider multiple calls to free
            self.shared_arrays[hdl] = None
            self.cnt -= 1
        self.lock.release()

    def __getArray(self, i):
        return self.shared_arrays[i]

    @staticmethod
    def getInstance():
        if not SharedNumpyMemManager._instance:
            SharedNumpyMemManager._instance = SharedNumpyMemManager()
        return SharedNumpyMemManager._instance

    @staticmethod
    def createArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)

    @staticmethod
    def getArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)

    @staticmethod    
    def freeArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)

# Init Singleton on module load
SharedNumpyMemManager.getInstance()

if __name__ == '__main__':

    import timeit

    N_PROC = 8
    INNER_LOOP = 10000
    N = 1000

    def propagate(t):
        i, shm_hdl, evidence = t
        a = SharedNumpyMemManager.getArray(shm_hdl)
        for j in range(INNER_LOOP):
            a[i] = i

    class Parallel_Dummy_PF:

        def __init__(self, N):
            self.N = N
            self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)            
            self.pool = multiprocessing.Pool(processes=N_PROC)

        def update_par(self, evidence):
            self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))

        def update_seq(self, evidence):
            for i in range(self.N):
                propagate((i, self.arrayHdl, evidence))

        def getArray(self):
            return SharedNumpyMemManager.getArray(self.arrayHdl)

    def parallelExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_par(5)
        print(pf.getArray())

    def sequentialExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_seq(5)
        print(pf.getArray())

    t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
    t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")

    print("Sequential: ", t1.timeit(number=1))    
    print("Parallel: ", t2.timeit(number=1))
13
répondu martin.preinfalk 2015-04-28 01:45:32