Utilisez le tableau de numpy dans la mémoire partagée pour le multiprocessing

je voudrais utiliser un tableau numpy dans la mémoire partagée pour une utilisation avec le module multiprocessing. La difficulté est de l'utiliser comme un tableau numpy, et pas seulement comme un ctypes tableau.

from multiprocessing import Process, Array
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child processes
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    # Printing out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

produit une sortie telle que:

Originally, the first two elements of arr = [0.3518653236697369, 0.517794725524976]
Now, the first two elements of arr = [-0.3518653236697369, 0.517794725524976]

le tableau peut être accédé de manière ctypes, par exemple arr[i] est logique. Cependant, il ne s'agit pas d'un tableau vide , et je ne peux pas effectuer des opérations telles que -1*arr , ou arr.sum() . Je suppose une solution serait de convertir le tableau ctypes en un tableau numpy. Cependant (en plus de ne pas être en mesure de faire ce travail), je ne crois pas qu'il serait partagée plus.

il semble qu'il y aurait une solution standard à ce qui doit être un problème commun.

75
demandé sur Praveen 2011-10-25 23:34:31

5 réponses

à ajouter aux réponses de @unutbu (plus disponible) et @Henry Gomersall. Vous pouvez utiliser shared_arr.get_lock() pour synchroniser l'accès en cas de besoin:

shared_arr = mp.Array(ctypes.c_double, N)
# ...
def f(i): # could be anything numpy accepts as an index such another numpy array
    with shared_arr.get_lock(): # synchronize access
        arr = np.frombuffer(shared_arr.get_obj()) # no data copying
        arr[i] = -arr[i]

exemple

import ctypes
import logging
import multiprocessing as mp

from contextlib import closing

import numpy as np

info = mp.get_logger().info

def main():
    logger = mp.log_to_stderr()
    logger.setLevel(logging.INFO)

    # create shared array
    N, M = 100, 11
    shared_arr = mp.Array(ctypes.c_double, N)
    arr = tonumpyarray(shared_arr)

    # fill with random values
    arr[:] = np.random.uniform(size=N)
    arr_orig = arr.copy()

    # write to arr from different processes
    with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p:
        # many processes access the same slice
        stop_f = N // 10
        p.map_async(f, [slice(stop_f)]*M)

        # many processes access different slices of the same array
        assert M % 2 # odd
        step = N // 10
        p.map_async(g, [slice(i, i + step) for i in range(stop_f, N, step)])
    p.join()
    assert np.allclose(((-1)**M)*tonumpyarray(shared_arr), arr_orig)

def init(shared_arr_):
    global shared_arr
    shared_arr = shared_arr_ # must be inherited, not passed as an argument

def tonumpyarray(mp_arr):
    return np.frombuffer(mp_arr.get_obj())

def f(i):
    """synchronized."""
    with shared_arr.get_lock(): # synchronize access
        g(i)

def g(i):
    """no synchronization."""
    info("start %s" % (i,))
    arr = tonumpyarray(shared_arr)
    arr[i] = -1 * arr[i]
    info("end   %s" % (i,))

if __name__ == '__main__':
    mp.freeze_support()
    main()

si vous n'avez pas besoin d'accès synchronisé ou si vous créez vos propres serrures, mp.Array() n'est pas nécessaire. Vous pouvez utiliser mp.sharedctypes.RawArray dans ce cas.

66
répondu jfs 2018-01-31 15:46:27

l'objet Array a une méthode get_obj() qui lui est associée, qui renvoie le tableau ctypes qui présente une interface buffer. Je pense que la suite devrait fonctionner...

from multiprocessing import Process, Array
import scipy
import numpy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    a = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(a[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(a,))
    p.start()
    p.join()

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%a[:2]

    b = numpy.frombuffer(a.get_obj())

    b[0] = 10.0
    print a[0]

lors de l'exécution, cela imprime le premier élément de a étant maintenant 10.0, montrant a et b sont juste deux vues dans la même mémoire.

afin de s'assurer qu'il est toujours multiprocesseur sûr, je crois que vous doivent utiliser les méthodes acquire et release qui existent sur l'objet Array , a , et son verrouillage intégré pour s'assurer de son accès tout en toute sécurité (bien que je ne suis pas un expert sur le module multiprocesseur).

15
répondu Henry Gomersall 2011-10-26 19:26:23

bien que les réponses déjà données soient bonnes, il y a une solution beaucoup plus facile à ce problème à condition que deux conditions soient remplies:

  1. vous êtes sur un système D'exploitation compatible POSIX (par exemple Linux, Mac OSX); et
  2. vos processus enfant ont besoin de accès en lecture seule au tableau partagé.

Dans ce cas, vous n'avez pas besoin de tripoter explicitement les variables partagées, car les processus enfants seront créés à l'aide d'une fourchette. Un enfant fourchu partage automatiquement l'espace mémoire du parent. Dans le contexte de Python multiprocessing, cela signifie qu'il partage toutes les variables au niveau du module ; notez que ce ne contient pas pour les arguments que vous passez explicitement aux processus de votre enfant ou aux fonctions que vous appelez sur un multiprocessing.Pool ou ainsi.

un exemple simple:

import multiprocessing
import numpy as np

# will hold the (implicitly mem-shared) data
data_array = None

# child worker function
def job_handler(num):
    # built-in id() returns unique memory ID of a variable
    return id(data_array), np.sum(data_array)

def launch_jobs(data, num_jobs=5, num_worker=4):
    global data_array
    data_array = data

    pool = multiprocessing.Pool(num_worker)
    return pool.map(job_handler, range(num_jobs))

# create some random data and execute the child jobs
mem_ids, sumvals = zip(*launch_jobs(np.random.rand(10)))

# this will print 'True' on POSIX OS, since the data was shared
print(np.all(np.asarray(mem_ids) == id(data_array)))
12
répondu EelkeSpaak 2016-06-10 11:20:24

j'ai écrit un petit module python qui utilise la mémoire partagée de POSIX pour partager des tableaux vides entre les interprètes python. Peut-être que vous trouverez à portée de la main.

https://pypi.python.org/pypi/SharedArray

Voici comment ça marche:

import numpy as np
import SharedArray as sa

# Create an array in shared memory
a = sa.create("test1", 10)

# Attach it as a different array. This can be done from another
# python interpreter as long as it runs on the same computer.
b = sa.attach("test1")

# See how they are actually sharing the same memory block
a[0] = 42
print(b[0])

# Destroying a does not affect b.
del a
print(b[0])

# See how "test1" is still present in shared memory even though we
# destroyed the array a.
sa.list()

# Now destroy the array "test1" from memory.
sa.delete("test1")

# The array b is not affected, but once you destroy it then the
# data are lost.
print(b[0])
9
répondu mat 2015-10-22 10:22:07

vous pouvez utiliser le module sharedmem : https://bitbucket.org/cleemesser/numpy-sharedmem

Voici votre code original alors, cette fois en utilisant la mémoire partagée qui se comporte comme un tableau de NumPy (notez la dernière déclaration supplémentaire appelant une fonction de NumPy sum() ):

from multiprocessing import Process
import sharedmem
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = sharedmem.empty(N)
    arr[:] = unshared_arr.copy()
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

    # Perform some NumPy operation
    print arr.sum()
8
répondu Velimir Mlaker 2013-05-25 19:27:16