Appliquer efficacement une fonction à une DataFrame pandas groupée en parallèle
j'ai souvent besoin d'appliquer une fonction à l'groupes d'une très grande DataFrame
(des types de données mélangés) et voudrais profiter de plusieurs cœurs.
je peux créer un itérateur à partir des groupes et utiliser le module multiprocessing, mais il n'est pas efficace parce que chaque groupe et les résultats de la fonction doivent être choisis pour la messagerie entre les processus.
y a-t-il un moyen d'éviter le décapage ou même la copie DataFrame
complètement? Il semble que les fonctions de mémoire partagée des modules multiprocesseurs soient limitées aux tableaux numpy
. Existe-il d'autres options?
1 réponses
D'après les commentaires ci-dessus, il semble que cela soit prévu pour pandas
un certain temps (il y a aussi un intéressant rosetta
projet que je viens de remarquer).
cependant, jusqu'à ce que chaque fonctionnalité parallèle soit incorporée dans pandas
, j'ai remarqué qu'il est très facile d'écrire des augmentations parallèles efficaces et sans mémoire de copie à pandas
directement en utilisant cython
+ OpenMP et C++.
voici un court exemple d'écriture d'une somme de groupe parallèle, dont l'utilisation est quelque chose comme ceci:
import pandas as pd
import para_group_demo
df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)})
print para_group_demo.sum(df.a, df.b)
et sortie est:
sum
key
0 6
1 11
2 4
Note sans doute, la fonctionnalité de cet exemple simple fera éventuellement partie de pandas
. Certaines choses, cependant, seront plus naturelles à paralléliser en C++ pendant un certain temps, et il est important de savoir à quel point il est facile de combiner cela en pandas
.
pour ce faire, j'ai écrit une simple extension de fichier source dont le code suit.
il commence par quelques importations et définitions de type
from libc.stdint cimport int64_t, uint64_t
from libcpp.vector cimport vector
from libcpp.unordered_map cimport unordered_map
cimport cython
from cython.operator cimport dereference as deref, preincrement as inc
from cython.parallel import prange
import pandas as pd
ctypedef unordered_map[int64_t, uint64_t] counts_t
ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t
ctypedef vector[counts_t] counts_vec_t
le type c++ unordered_map
est pour la sommation par un seul thread, et le vector
est pour la sommation par tous les threads.
maintenant à la la fonction sum
. Il commence avec vues de mémoire dactylographiées pour l'accès rapide:
def sum(crit, vals):
cdef int64_t[:] crit_view = crit.values
cdef int64_t[:] vals_view = vals.values
la fonction continue en divisant le semi-également aux fils (ici codé en dur à 4), et ayant chaque fil font la somme des entrées dans sa gamme:
cdef uint64_t num_threads = 4
cdef uint64_t l = len(crit)
cdef uint64_t s = l / num_threads + 1
cdef uint64_t i, j, e
cdef counts_vec_t counts
counts = counts_vec_t(num_threads)
counts.resize(num_threads)
with cython.boundscheck(False):
for i in prange(num_threads, nogil=True):
j = i * s
e = j + s
if e > l:
e = l
while j < e:
counts[i][crit_view[j]] += vals_view[j]
inc(j)
lorsque les threads ont terminé, la fonction fusionne tous les résultats (à partir des différentes gammes) dans un seul unordered_map
:
cdef counts_t total
cdef counts_it_t it, e_it
for i in range(num_threads):
it = counts[i].begin()
e_it = counts[i].end()
while it != e_it:
total[deref(it).first] += deref(it).second
inc(it)
il ne reste plus qu'à créer un DataFrame
et retourner les résultats:
key, sum_ = [], []
it = total.begin()
e_it = total.end()
while it != e_it:
key.append(deref(it).first)
sum_.append(deref(it).second)
inc(it)
df = pd.DataFrame({'key': key, 'sum': sum_})
df.set_index('key', inplace=True)
return df