Les Pandas de df.parallélisation iterrow()

<!-Je voudrais mettre en parallèle le code suivant:

for row in df.iterrow():
    idx = row[0]
    k = row[1]['Chromosome']
    start,end = row[1]['Bin'].split('-')

    sequence = sequence_from_coordinates(k,1,start,end) #slow download form http

    df.set_value(idx,'GC%',gc_content(sequence,percent=False,verbose=False))
    df.set_value(idx,'G4 repeats', sum([len(list(i)) for i in g4_scanner(sequence)]))
    df.set_value(idx,'max flexibility',max([item[1] for item in dna_flex(sequence,verbose=False)]))

j'ai essayé d'utiliser multiprocessing.Pool() puisque chaque ligne peut être traitée indépendamment, mais je ne peux pas trouver comment partager la base de données. Je suis également pas sûr que ce soit la meilleure approche pour faire la parallélisation avec les pandas. Toute aide?

12
demandé sur alec_djinn 2016-11-01 12:39:12

2 réponses

@Khris a dit dans son commentaire, vous devez diviser votre dataframe dans quelques gros morceaux et itérer sur chaque morceau en parallèle. Vous pouvez diviser arbitrairement la dataframe en morceaux de taille aléatoire, mais il est plus logique de diviser la dataframe en morceaux de taille égale en fonction du nombre de processus que vous prévoyez d'utiliser. Heureusement, quelqu'un d'autre a déjà trouvé comment faire la partie pour nous:

# don't forget to import
import pandas as pd
import multiprocessing

# create as many processes as there are CPUs on your machine
num_processes = multiprocessing.cpu_count()

# calculate the chunk size as an integer
chunk_size = int(df.shape[0]/num_processes)

# this solution was reworked from the above link.
# will work even if the length of the dataframe is not evenly divisible by num_processes
chunks = [df.ix[df.index[i:i + chunk_size]] for i in range(0, df.shape[0], chunk_size)]

cela crée une liste qui contient nos dataframe en morceaux. Maintenant nous avons besoin de le passer dans notre pool avec une fonction qui va manipuler les données.

def func(d):
   # let's create a function that squares every value in the dataframe
   return d * d

# create our pool with `num_processes` processes
pool = multiprocessing.Pool(processes=num_processes)

# apply our function to each chunk in the list
result = pool.map(func, chunks)

À ce stade, result sera une liste contenant chaque morceau après avoir été manipulé. Dans ce cas, toutes les valeurs ont été carrées. Le problème maintenant est que la base de données originale n'a pas été modifiée, nous devons donc remplacer toutes ses valeurs existantes par les résultats de notre pool.

for i in range(len(result)):
   # since result[i] is just a dataframe
   # we can reassign the original dataframe based on the index of each chunk
   df.ix[result[i].index] = result[i]

Maintenant, ma fonction pour manipuler mon dataframe est vectorisé et aurait probablement été plus rapide si j'avais simplement appliqué à l'intégralité de mon dataframe au lieu de se diviser en morceaux. Cependant, dans votre cas, votre fonction itérer sur chaque ligne de chaque bloc, puis retourner le morceau. Cela vous permet de traiter num_process lignes à la fois.

def func(d):
   for row in d.iterrow():
      idx = row[0]
      k = row[1]['Chromosome']
      start,end = row[1]['Bin'].split('-')

      sequence = sequence_from_coordinates(k,1,start,end) #slow download form http
      d.set_value(idx,'GC%',gc_content(sequence,percent=False,verbose=False))
      d.set_value(idx,'G4 repeats', sum([len(list(i)) for i in g4_scanner(sequence)]))
      d.set_value(idx,'max flexibility',max([item[1] for item in dna_flex(sequence,verbose=False)]))
   # return the chunk!
   return d

puis vous réassignez les valeurs dans la base de données originale, et vous avez réussi à paralléliser ce processus.

Combien De Processus Dois-Je Utilisez?

votre performance optimale va dépendre de la réponse à cette question. Alors que "tous les processus!!!!"est une réponse, une meilleure réponse est beaucoup plus nuancée. Après un certain point, jeter plus de processus à un problème crée en fait plus de frais généraux que cela ne vaut la peine. Ceci est connu comme la loi D'Amdahl. Encore une fois, nous avons la chance que d'autres se soient déjà penchés sur cette question pour nous:

  1. Python multitraitement de la Piscine limite de processus
  2. Combien de processus dois-je exécuter en parallèle?

un bon défaut est d'utiliser multiprocessing.cpu_count(), qui est le comportement par défaut de multiprocessing.Pool. selon la documentation " si les processus sont None, alors le nombre retourné par cpu_count() est utilisé."C'est pourquoi j'ai mis num_processes au début multiprocessing.cpu_count(). De cette façon, si vous vous déplacez vers une machine plus lourde, vous obtenez les avantages de celui-ci sans avoir à changer la num_processes variable directement.

27
répondu TheF1rstPancake 2017-05-23 12:16:29

Un moyen plus rapide (environ 10% dans mon cas):

principales différences par rapport à la réponse acceptée: utilisez pd.concat et np.array_split pour se séparer et rejoindre le dataframre.

import multiprocessing
import numpy as np


def parallelize_dataframe(df, func):
    num_cores = multiprocessing.cpu_count()-1  #leave one free to not freeze machine
    num_partitions = num_cores #number of partitions to split dataframe
    df_split = np.array_split(df, num_partitions)
    pool = multiprocessing.Pool(num_cores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df

func est la fonction que vous souhaitez appliquer à df. Utilisez partial(func, arg=arg_val) pour plus qu'un seul argument.

9
répondu ic_fl2 2017-08-28 11:51:44