python dask DataFrame, support (trivialement parallélisables) ligne à appliquer?
j'ai récemment trouvé DASK module qui vise à être un python facile à utiliser le module de traitement en parallèle. Pour moi, ça marche avec les pandas.
après avoir lu un peu sur sa page de manuel, Je ne trouve pas le moyen de faire cette tâche trivialement parallélisable:
ts.apply(func) # for pandas series
df.apply(func, axis = 1) # for pandas DF row apply
À l'heure actuelle, pour atteindre cet objectif dans dask, autant que je sache,
ddf.assign(A=lambda df: df.apply(func, axis=1)).compute() # dask DataFrame
qui est une syntaxe laide et qui est en fait plus lente que pure et simple
df.apply(func, axis = 1) # for pandas DF row apply
une suggestion?
Edit: Merci @MRocklin pour la fonction map. Il semble être plus lent que les pandas simples appliquent. Est-ce lié au problème de publication de pandas GIL ou est-ce que je le fais mal?
import dask.dataframe as dd
s = pd.Series([10000]*120)
ds = dd.from_pandas(s, npartitions = 3)
def slow_func(k):
A = np.random.normal(size = k) # k = 10000
s = 0
for a in A:
if a > 0:
s += 1
else:
s -= 1
return s
s.apply(slow_func) # 0.43 sec
ds.map(slow_func).compute() # 2.04 sec
2 réponses
map_partitions
vous pouvez appliquer votre fonction à toutes les partitions de votre dataframe avec la fonction map_partitions
.
df.map_partitions(func, columns=...)
notez que func ne sera donné qu'une partie de l'ensemble de données à la fois, pas l'ensemble de données entier comme avec pandas apply
(que vous ne voudriez probablement pas si vous voulez faire du parallélisme.)
map
/ apply
Vous pouvez mapper une fonction rang-wise à travers une série avec map
df.mycolumn.map(func)
vous pouvez cartographier une fonction par ligne à travers une dataframe avec apply
df.apply(func, axis=1)
Threads vs process
à partir de la version 0.6.0 dask.dataframes
correspond à des threads. Les fonctions Python personnalisées ne bénéficieront pas beaucoup du parallélisme basé sur le thread. Vous pourriez essayer des processus à la place
df = dd.read_csv(...)
from dask.multiprocessing import get
df.map_partitions(func, columns=...).compute(get=get)
mais éviter apply
cependant, vous devriez vraiment éviter apply
avec des fonctions Python personnalisées, à la fois dans Pandas et en Dask. C'est souvent une source de mauvaise performance. Il se peut que si vous trouvez un moyen de faire votre opération de manière vectorisée, alors il se peut que votre code Pandas soit 100 fois plus rapide et que vous n'ayez pas besoin de dask.dataframe à tous.
prendre en considération numba
pour votre problème particulier, vous pourriez considérer numba
. Cela améliore considérablement vos performances.
In [1]: import numpy as np
In [2]: import pandas as pd
In [3]: s = pd.Series([10000]*120)
In [4]: %paste
def slow_func(k):
A = np.random.normal(size = k) # k = 10000
s = 0
for a in A:
if a > 0:
s += 1
else:
s -= 1
return s
## -- End pasted text --
In [5]: %time _ = s.apply(slow_func)
CPU times: user 345 ms, sys: 3.28 ms, total: 348 ms
Wall time: 347 ms
In [6]: import numba
In [7]: fast_func = numba.jit(slow_func)
In [8]: %time _ = s.apply(fast_func) # First time incurs compilation overhead
CPU times: user 179 ms, sys: 0 ns, total: 179 ms
Wall time: 175 ms
In [9]: %time _ = s.apply(fast_func) # Subsequent times are all gain
CPU times: user 68.8 ms, sys: 27 µs, total: 68.8 ms
Wall time: 68.7 ms
Disclaimer, je travaille pour la société qui fait à la fois numba
et dask
et emploie de nombreux pandas
développeurs.
à partir de v dask.dataframe
.appliquer la responsabilité des délégués à map_partitions
:
@insert_meta_param_description(pad=12)
def apply(self, func, convert_dtype=True, meta=no_default, args=(), **kwds):
""" Parallel version of pandas.Series.apply
...
"""
if meta is no_default:
msg = ("`meta` is not specified, inferred from partial data. "
"Please provide `meta` if the result is unexpected.\n"
" Before: .apply(func)\n"
" After: .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result\n"
" or: .apply(func, meta=('x', 'f8')) for series result")
warnings.warn(msg)
meta = _emulate(M.apply, self._meta_nonempty, func,
convert_dtype=convert_dtype,
args=args, **kwds)
return map_partitions(M.apply, self, func,
convert_dtype, args, meta=meta, **kwds)