Appliquer efficacement une fonction à une DataFrame pandas groupée en parallèle

j'ai souvent besoin d'appliquer une fonction à l'groupes d'une très grande DataFrame (des types de données mélangés) et voudrais profiter de plusieurs cœurs.

je peux créer un itérateur à partir des groupes et utiliser le module multiprocessing, mais il n'est pas efficace parce que chaque groupe et les résultats de la fonction doivent être choisis pour la messagerie entre les processus.

y a-t-il un moyen d'éviter le décapage ou même la copie DataFrame complètement? Il semble que les fonctions de mémoire partagée des modules multiprocesseurs soient limitées aux tableaux numpy . Existe-il d'autres options?

88
demandé sur MaxU 2012-07-31 00:08:57

1 réponses

D'après les commentaires ci-dessus, il semble que cela soit prévu pour pandas un certain temps (il y a aussi un intéressant rosetta projet que je viens de remarquer).

cependant, jusqu'à ce que chaque fonctionnalité parallèle soit incorporée dans pandas , j'ai remarqué qu'il est très facile d'écrire des augmentations parallèles efficaces et sans mémoire de copie à pandas directement en utilisant cython + OpenMP et C++.

voici un court exemple d'écriture d'une somme de groupe parallèle, dont l'utilisation est quelque chose comme ceci:

import pandas as pd
import para_group_demo

df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)})
print para_group_demo.sum(df.a, df.b)

et sortie est:

     sum
key     
0      6
1      11
2      4

Note sans doute, la fonctionnalité de cet exemple simple fera éventuellement partie de pandas . Certaines choses, cependant, seront plus naturelles à paralléliser en C++ pendant un certain temps, et il est important de savoir à quel point il est facile de combiner cela en pandas .


pour ce faire, j'ai écrit une simple extension de fichier source dont le code suit.

il commence par quelques importations et définitions de type

from libc.stdint cimport int64_t, uint64_t
from libcpp.vector cimport vector
from libcpp.unordered_map cimport unordered_map

cimport cython
from cython.operator cimport dereference as deref, preincrement as inc
from cython.parallel import prange

import pandas as pd

ctypedef unordered_map[int64_t, uint64_t] counts_t
ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t
ctypedef vector[counts_t] counts_vec_t

le type c++ unordered_map est pour la sommation par un seul thread, et le vector est pour la sommation par tous les threads.

maintenant à la la fonction sum . Il commence avec vues de mémoire dactylographiées pour l'accès rapide:

def sum(crit, vals):
    cdef int64_t[:] crit_view = crit.values
    cdef int64_t[:] vals_view = vals.values

la fonction continue en divisant le semi-également aux fils (ici codé en dur à 4), et ayant chaque fil font la somme des entrées dans sa gamme:

    cdef uint64_t num_threads = 4
    cdef uint64_t l = len(crit)
    cdef uint64_t s = l / num_threads + 1
    cdef uint64_t i, j, e
    cdef counts_vec_t counts
    counts = counts_vec_t(num_threads)
    counts.resize(num_threads)
    with cython.boundscheck(False):
        for i in prange(num_threads, nogil=True): 
            j = i * s
            e = j + s
            if e > l:
                e = l
            while j < e:
                counts[i][crit_view[j]] += vals_view[j]
                inc(j)

lorsque les threads ont terminé, la fonction fusionne tous les résultats (à partir des différentes gammes) dans un seul unordered_map :

    cdef counts_t total
    cdef counts_it_t it, e_it
    for i in range(num_threads):
        it = counts[i].begin()
        e_it = counts[i].end()
        while it != e_it:
            total[deref(it).first] += deref(it).second
            inc(it)        

il ne reste plus qu'à créer un DataFrame et retourner les résultats:

    key, sum_ = [], []
    it = total.begin()
    e_it = total.end()
    while it != e_it:
        key.append(deref(it).first)
        sum_.append(deref(it).second)
        inc(it)

    df = pd.DataFrame({'key': key, 'sum': sum_})
    df.set_index('key', inplace=True)
    return df
12
répondu Ami Tavory 2015-05-21 10:18:25