Comment résoudre les problèmes de mémoire tout en multiprocessing en utilisant Pool.map()?

j'ai écrit le programme (ci-dessous):

  • lire un énorme fichier texte pandas dataframe
  • groupby en utilisant une valeur de colonne spécifique pour séparer les données et les stocker en tant que liste d'images de données.
  • puis passez les données àmultiprocess Pool.map() pour traiter chaque dataframe en parallèle.

Tout va bien, le programme fonctionne bien sur mon petit ensemble de données de test. Mais, quand je pipe dans mes grandes données (environ 14 Go), la consommation de mémoire exponentiellement augmente puis gèle l'ordinateur ou se fait tuer (dans le cluster HPC).

j'ai ajouté des codes pour effacer la mémoire dès que la donnée/variable n'est pas utile. Je ferme aussi la piscine dès que c'est fait. Toujours avec une entrée de 14 GO, Je ne m'attendais qu'à une charge de mémoire de 2*14 Go, mais il semble que beaucoup de choses se passent. J'ai aussi essayé de ruser en utilisant chunkSize and maxTaskPerChild, etc mais je ne vois pas de différence dans l'optimisation à la fois dans le test et dans le gros fichier.

je pense que des améliorations à ce code est/sont requis à cette position dans le code, quand j'ai commencer multiprocessing.

p = Pool(3) # number of pool to run at once; default at 1 result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values())) mais, je vous poste le code complet.

exemple de Test: j'ai créé un fichier test ("genome_matrix_final-chr1234-1MB.txt") jusqu'à 250 mb et a géré le programme. Quand je regarde le moniteur de système, je peux voir que la consommation de mémoire a augmenté d'environ 6 Go. Je ne suis pas si clair pourquoi tant d'espace mémoire est pris par le fichier de 250 Mo plus quelques sorties. J'ai partagé ce fichier via drop box si elle contribue à en voyant le vrai problème. https://www.dropbox.com/sh/coihujii38t5prd/AABDXv8ACGIYczeMtzKBo0eea?dl=0

quelqu'un peut-il suggérer, comment je peux me débarrasser du problème?

Mon script python:

#!/home/bin/python3

import pandas as pd
import collections
from multiprocessing import Pool
import io
import time
import resource

print()
print('Checking required modules')
print()


''' change this input file name and/or path as need be '''
genome_matrix_file = "genome_matrix_final-chr1n2-2mb.txt"   # test file 01
genome_matrix_file = "genome_matrix_final-chr1234-1mb.txt"  # test file 02
#genome_matrix_file = "genome_matrix_final.txt"    # large file 

def main():
    with open("genome_matrix_header.txt") as header:
        header = header.read().rstrip('n').split('t')
        print()

    time01 = time.time()
    print('starting time: ', time01)

    '''load the genome matrix file onto pandas as dataframe.
    This makes is more easy for multiprocessing'''
    gen_matrix_df = pd.read_csv(genome_matrix_file, sep='t', names=header)

    # now, group the dataframe by chromosome/contig - so it can be multiprocessed
    gen_matrix_df = gen_matrix_df.groupby('CHROM')

    # store the splitted dataframes as list of key, values(pandas dataframe) pairs
    # this list of dataframe will be used while multiprocessing
    gen_matrix_df_list = collections.OrderedDict()
    for chr_, data in gen_matrix_df:
        gen_matrix_df_list[chr_] = data

    # clear memory
    del gen_matrix_df

    '''Now, pipe each dataframe from the list using map.Pool() '''
    p = Pool(3)  # number of pool to run at once; default at 1
    result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))

    del gen_matrix_df_list  # clear memory

    p.close()
    p.join()


    # concat the results from pool.map() and write it to a file
    result_merged = pd.concat(result)
    del result  # clear memory

    pd.DataFrame.to_csv(result_merged, "matrix_to_haplotype-chr1n2.txt", sep='t', header=True, index=False)

    print()
    print('completed all process in "%s" sec. ' % (time.time() - time01))
    print('Global maximum memory usage: %.2f (mb)' % current_mem_usage())
    print()


'''function to convert the dataframe from genome matrix to desired output '''
def matrix_to_vcf(matrix_df):

    print()
    time02 = time.time()

    # index position of the samples in genome matrix file
    sample_idx = [{'10a': 33, '10b': 18}, {'13a': 3, '13b': 19},
                    {'14a': 20, '14b': 4}, {'16a': 5, '16b': 21},
                    {'17a': 6, '17b': 22}, {'23a': 7, '23b': 23},
                    {'24a': 8, '24b': 24}, {'25a': 25, '25b': 9},
                    {'26a': 10, '26b': 26}, {'34a': 11, '34b': 27},
                    {'35a': 12, '35b': 28}, {'37a': 13, '37b': 29},
                    {'38a': 14, '38b': 30}, {'3a': 31, '3b': 15},
                    {'8a': 32, '8b': 17}]

    # sample index stored as ordered dictionary
    sample_idx_ord_list = []
    for ids in sample_idx:
        ids = collections.OrderedDict(sorted(ids.items()))
        sample_idx_ord_list.append(ids)


    # for haplotype file
    header = ['contig', 'pos', 'ref', 'alt']

    # adding some suffixes "PI" to available sample names
    for item in sample_idx_ord_list:
        ks_update = ''
        for ks in item.keys():
            ks_update += ks
        header.append(ks_update+'_PI')
        header.append(ks_update+'_PG_al')


    #final variable store the haplotype data
    # write the header lines first
    haplotype_output = 't'.join(header) + 'n'


    # to store the value of parsed the line and update the "PI", "PG" value for each sample
    updated_line = ''

    # read the piped in data back to text like file
    matrix_df = pd.DataFrame.to_csv(matrix_df, sep='t', index=False)

    matrix_df = matrix_df.rstrip('n').split('n')
    for line in matrix_df:
        if line.startswith('CHROM'):
            continue

        line_split = line.split('t')
        chr_ = line_split[0]
        ref = line_split[2]
        alt = list(set(line_split[3:]))

        # remove the alleles "N" missing and "ref" from the alt-alleles
        alt_up = list(filter(lambda x: x!='N' and x!=ref, alt))

        # if no alt alleles are found, just continue
        # - i.e : don't write that line in output file
        if len(alt_up) == 0:
            continue

        #print('nMining data for chromosome/contig "%s" ' %(chr_ ))
        #so, we have data for CHR, POS, REF, ALT so far
        # now, we mine phased genotype for each sample pair (as "PG_al", and also add "PI" tag)
        sample_data_for_vcf = []
        for ids in sample_idx_ord_list:
            sample_data = []
            for key, val in ids.items():
                sample_value = line_split[val]
                sample_data.append(sample_value)

            # now, update the phased state for each sample
            # also replacing the missing allele i.e "N" and "-" with ref-allele
            sample_data = ('|'.join(sample_data)).replace('N', ref).replace('-', ref)
            sample_data_for_vcf.append(str(chr_))
            sample_data_for_vcf.append(sample_data)

        # add data for all the samples in that line, append it with former columns (chrom, pos ..) ..
        # and .. write it to final haplotype file
        sample_data_for_vcf = 't'.join(sample_data_for_vcf)
        updated_line = 't'.join(line_split[0:3]) + 't' + ','.join(alt_up) + 
            't' + sample_data_for_vcf + 'n'
        haplotype_output += updated_line

    del matrix_df  # clear memory
    print('completed haplotype preparation for chromosome/contig "%s" '
          'in "%s" sec. ' %(chr_, time.time()-time02))
    print('tWorker maximum memory usage: %.2f (mb)' %(current_mem_usage()))

    # return the data back to the pool
    return pd.read_csv(io.StringIO(haplotype_output), sep='t')


''' to monitor memory '''
def current_mem_usage():
    return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024.


if __name__ == '__main__':
    main()

mise à Jour pour les chasseurs de primes:

j'ai atteint le multiprocessing en utilisant Pool.map() mais le code cause une lourde charge de mémoire (fichier test d'entrée ~ 300 Mo, mais la charge de mémoire est d'environ 6 Go). J'ai été seulement on s'attend à un fardeau de mémoire de 3*300 mb au maximum.

  • quelqu'un peut-il expliquer, ce qui provoque une telle exigence de mémoire énorme pour un si petit fichier et pour un calcul de si petite longueur.
  • aussi, j'essaie de prendre la réponse et de l'utiliser pour améliorer le multiprocess dans mon grand programme. Ainsi, l'ajout de n'importe quelle méthode, module qui ne change pas trop la structure de la partie de calcul (processus lié au CPU) devrait être bien.
  • j'ai inclus deux fichiers de test pour les fins de test de jouer avec le code.
  • Le code joint est le code complet alors il devrait fonctionner comme prévu en tant qu'il est quand copié-collé. Toutes les modifications doivent être utilisés que pour améliorer l'optimisation de multitraitement étapes.
15
demandé sur Fermi paradox 2018-03-22 15:59:17

4 réponses

condition préalable

  1. En Python (j'utilise 64 bits version de Python 3.6.5) tout est un objet. Cela a ses frais généraux et getsizeof nous pouvons voir exactement de la taille d'un objet en octets:

    >>> import sys
    >>> sys.getsizeof(42)
    28
    >>> sys.getsizeof('T')
    50
    
  2. lorsque l'appel de système fork est utilisé (par défaut sur * nix, voir multiprocessing.get_start_method()) pour créer un processus enfant, la mémoire physique du parent n'est pas copiée et copy-on-write la technique est utilisée.
  3. le processus Fork child rapportera quand même le flux RSS complet (taille de l'ensemble résident) du processus parent. De ce fait, PSS (proportional set size) est une mesure plus appropriée pour estimer l'utilisation de la mémoire de l'application de bifurcation. Voici un exemple tiré de la page:
  • le procédé A A 50 KiB de mémoire non partagée
  • le procédé B A 300 KiB de mémoire non partagée
  • le procédé A et le procédé B ont tous deux 100 KiB de de la même zone de mémoire partagée

puisque le ssp est défini comme la somme de la mémoire non partagée d'un processus et de la proportion de mémoire partagée avec d'autres processus, les ssp pour ces deux processus sont les suivants:

  • PSS de processus A = 50 Kio + (100 Ko / 2) = 100 Kio
  • PSS du procédé B = 300 KiB + (100 KiB / 2) = 350 KiB

la base de données

ne regardons pas votre DataFrame seul. memory_profiler nous aidera.

justpd.py

#!/usr/bin/env python3

import pandas as pd
from memory_profiler import profile

@profile
def main():
    with open('genome_matrix_header.txt') as header:
        header = header.read().rstrip('\n').split('\t')

    gen_matrix_df = pd.read_csv(
        'genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header)

    gen_matrix_df.info()
    gen_matrix_df.info(memory_usage='deep')

if __name__ == '__main__':
    main()

maintenant, utilisons le profileur:

mprof run justpd.py
mprof plot

on peut voir l'intrigue:

memory_profile

et ligne par ligne de trace:

Line #    Mem usage    Increment   Line Contents
================================================
     6     54.3 MiB     54.3 MiB   @profile
     7                             def main():
     8     54.3 MiB      0.0 MiB       with open('genome_matrix_header.txt') as header:
     9     54.3 MiB      0.0 MiB           header = header.read().rstrip('\n').split('\t')
    10                             
    11   2072.0 MiB   2017.7 MiB       gen_matrix_df = pd.read_csv('genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header)
    12                                 
    13   2072.0 MiB      0.0 MiB       gen_matrix_df.info()
    14   2072.0 MiB      0.0 MiB       gen_matrix_df.info(memory_usage='deep')

nous pouvons voir que le cadre de données prend ~2 GiB avec le pic à ~3 GiB pendant qu'il est construit. Ce qui est plus intéressant est la sortie de info.

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4000000 entries, 0 to 3999999
Data columns (total 34 columns):
...
dtypes: int64(2), object(32)
memory usage: 1.0+ GB

Mais info(memory_usage='deep') ("deep" signifie l'introspection des données profondément par interrogation objectdtypes, voir ci-dessous) donne:

memory usage: 7.9 GB

Hein?! En regardant en dehors du processus nous pouvons nous assurer que memory_profiler's les chiffres sont exacts. sys.getsizeof montre aussi la même valeur pour le cadre (probablement à cause de custom __sizeof__) et donc d'autres outils qui l'utilisent pour estimer alloués gc.get_objects(), p.ex. pympler.

# added after read_csv
from pympler import tracker
tr = tracker.SummaryTracker()
tr.print_diff()   

Donne:

                                             types |   # objects |   total size
================================================== | =========== | ============
                 <class 'pandas.core.series.Series |          34 |      7.93 GB
                                      <class 'list |        7839 |    732.38 KB
                                       <class 'str |        7741 |    550.10 KB
                                       <class 'int |        1810 |     49.66 KB
                                      <class 'dict |          38 |      7.43 KB
  <class 'pandas.core.internals.SingleBlockManager |          34 |      3.98 KB
                             <class 'numpy.ndarray |          34 |      3.19 KB

alors d'où viennent ces 7.93 GiB? Nous allons essayer d'expliquer cela. Nous avons des rangées de 4m et 34 colonnes, ce qui nous donne des valeurs de 134M. Ils sont soit int64 ou object (qui est un pointeur 64 bits; voir utilisation de pandas avec de grandes données pour une explication détaillée). Ainsi, nous avons 134 * 10 ** 6 * 8 / 2 ** 20 ~1022 MiB que pour des valeurs dans le bloc de données. Qu'en est-il du reste ~ 6,93 GiB?

Chaîne stage

pour comprendre le comportement, il est nécessaire de savoir que Python fait l'interning des cordes. Il y a deux bons articles (,deux) sur la chaîne de stage en Python 2. En plus du changement Unicode en Python 3 et PEP 393 en Python 3.3 Les C-structures ont changé, mais l'idée est la même. Fondamentalement, chaque chaîne courte qui ressemble à un identifiant sera mise en cache par Python dans un dictionnaire interne et des références pointera vers les mêmes objets Python. En d'autres mots, nous pouvons dire qu'il se comporte comme un singleton. Les Articles que j'ai mentionnés ci-dessus expliquent le profil de mémoire significatif et les améliorations de performance qu'il donne. Nous pouvons vérifier si une chaîne est internée en utilisant interned champ PyASCIIObject:

import ctypes

class PyASCIIObject(ctypes.Structure):
     _fields_ = [
         ('ob_refcnt', ctypes.c_size_t),
         ('ob_type', ctypes.py_object),
         ('length', ctypes.c_ssize_t),
         ('hash', ctypes.c_int64),
         ('state', ctypes.c_int32),
         ('wstr', ctypes.c_wchar_p)
    ]

Puis:

>>> a = 'name'
>>> b = '!@#$'
>>> a_struct = PyASCIIObject.from_address(id(a))
>>> a_struct.state & 0b11
1
>>> b_struct = PyASCIIObject.from_address(id(b))
>>> b_struct.state & 0b11
0

avec deux chaînes nous pouvons aussi faire une comparaison d'identité (adressée en comparaison de mémoire dans le cas de Disponible).

>>> a = 'foo'
>>> b = 'foo'
>>> a is b
True
>> gen_matrix_df.REF[0] is gen_matrix_df.REF[6]
True

Parce que de ce fait, en ce qui concerne objectdtype, le data frame alloue au maximum 20 chaînes (une par acides aminés). Cependant, il est intéressant de noter que Pandas recommande catégorique types pour les énumérations.

Pandas mémoire

ainsi nous pouvons expliquer l'estimation naïve de 7.93 GiB comme:

>>> rows = 4 * 10 ** 6
>>> int_cols = 2
>>> str_cols = 32
>>> int_size = 8
>>> str_size = 58  
>>> ptr_size = 8
>>> (int_cols * int_size + str_cols * (str_size + ptr_size)) * rows / 2 ** 30
7.927417755126953

Notez que str_size 58 octets, et non pas 50 comme nous l'avons vu ci-dessus, pour les 1-caractère littéral. C'est parce que PEP 393 définit des cordes compactes et non compactes. Vous pouvez le vérifier avec sys.getsizeof(gen_matrix_df.REF[0]).

la consommation réelle de mémoire devrait être ~1 GiB comme il est rapporté par gen_matrix_df.info(), c'est deux fois plus. Nous pouvons supposer qu'il a quelque chose à voir avec la mémoire (pré)allocation faite par Pandas ou NumPy. L'expérience suivante montre que ce n'est pas sans raison (plusieurs passages montrent l'image de sauvegarde):

Line #    Mem usage    Increment   Line Contents
================================================
     8     53.1 MiB     53.1 MiB   @profile
     9                             def main():
    10     53.1 MiB      0.0 MiB       with open("genome_matrix_header.txt") as header:
    11     53.1 MiB      0.0 MiB           header = header.read().rstrip('\n').split('\t')
    12                             
    13   2070.9 MiB   2017.8 MiB       gen_matrix_df = pd.read_csv('genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header)
    14   2071.2 MiB      0.4 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[gen_matrix_df.keys()[0]])
    15   2071.2 MiB      0.0 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[gen_matrix_df.keys()[0]])
    16   2040.7 MiB    -30.5 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    ...
    23   1827.1 MiB    -30.5 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    24   1094.7 MiB   -732.4 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    25   1765.9 MiB    671.3 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    26   1094.7 MiB   -671.3 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    27   1704.8 MiB    610.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    28   1094.7 MiB   -610.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    29   1643.9 MiB    549.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    30   1094.7 MiB   -549.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    31   1582.8 MiB    488.1 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    32   1094.7 MiB   -488.1 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])    
    33   1521.9 MiB    427.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])    
    34   1094.7 MiB   -427.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    35   1460.8 MiB    366.1 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    36   1094.7 MiB   -366.1 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    37   1094.7 MiB      0.0 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    ...
    47   1094.7 MiB      0.0 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])

je veux finir cette section par une citation de nouvel article sur les problèmes de conception et pandas futurs--65-- > par L'auteur original de Pandas.

pandas règle empirique: avoir 5 à 10 fois plus de RAM que la taille de votre ensemble de données

Arbre de processus

venez à la piscine, enfin, et voir si peut faire usage de la copie-sur-écriture. Nous allons utiliser smemstat (disponible dans un dépôt Ubuntu) pour estimer le partage de mémoire du groupe de processus et glances pour écrire à l'échelle du système libérer de la mémoire. Les deux peuvent écrire JSON.

nous allons lancer le script original avec Pool(2). Il nous faut 3 fenêtres de terminal.

  1. smemstat -l -m -p "python3.6 script.py" -o smemstat.json 1
  2. glances -t 1 --export-json glances.json
  3. mprof run -M script.py

mprof plot produit:

3 processes

La somme de graphique (mprof run --nopython --include-children ./script.py) ressemble à:

enter image description here

notez que deux Graphiques ci-dessus afficher les flux RSS. L'hypothèse est qu'à cause de la copie-on-write, elle ne reflète pas l'usage réel de la mémoire. Maintenant nous avons deux fichiers JSON de smemstat et glances. Je vais faire le script suivant pour cacher les fichiers JSON au CSV.

#!/usr/bin/env python3

import csv
import sys
import json

def smemstat():
  with open('smemstat.json') as f:
    smem = json.load(f)

  rows = []
  fieldnames = set()    
  for s in smem['smemstat']['periodic-samples']:
    row = {}
    for ps in s['smem-per-process']:
      if 'script.py' in ps['command']:
        for k in ('uss', 'pss', 'rss'):
          row['{}-{}'.format(ps['pid'], k)] = ps[k] // 2 ** 20

    # smemstat produces empty samples, backfill from previous
    if rows:            
      for k, v in rows[-1].items():
        row.setdefault(k, v)

    rows.append(row)
    fieldnames.update(row.keys())

  with open('smemstat.csv', 'w') as out:
    dw = csv.DictWriter(out, fieldnames=sorted(fieldnames))
    dw.writeheader()
    list(map(dw.writerow, rows))

def glances():
  rows = []
  fieldnames = ['available', 'used', 'cached', 'mem_careful', 'percent',
    'free', 'mem_critical', 'inactive', 'shared', 'history_size',
    'mem_warning', 'total', 'active', 'buffers']
  with open('glances.csv', 'w') as out:
    dw = csv.DictWriter(out, fieldnames=fieldnames)
    dw.writeheader()
    with open('glances.json') as f:
      for l in f:
        d = json.loads(l)
        dw.writerow(d['mem'])

if __name__ == '__main__':
  globals()[sys.argv[1]]()

d'abord, regardons free mémoire.

enter image description here

la différence entre le premier et le minimum est ~ 4.15 GiB. Et voici à quoi ressemblent les chiffres du PSS comme:

enter image description here

Et la somme:

enter image description here

nous pouvons donc voir qu'en raison de la consommation réelle de mémoire de copie-sur-écriture est d'environ 4.15 GiB. Mais nous sommes encore en train de sérialiser les données pour les envoyer aux processus des travailleurs via Pool.map. Pouvons-nous tirer parti de copie sur écriture ici aussi?

données partagées

Pour utiliser la copie sur écriture, nous avons besoin d' list(gen_matrix_df_list.values()) être accessible globalement donc le travailleur après fourchette peut encore le lire.

  1. modifions le code après del gen_matrix_dfmain comme suit:

    ...
    global global_gen_matrix_df_values
    global_gen_matrix_df_values = list(gen_matrix_df_list.values())
    del gen_matrix_df_list
    
    p = Pool(2)
    result = p.map(matrix_to_vcf, range(len(global_gen_matrix_df_values)))
    ...
    
  2. Supprimer del gen_matrix_df_list cela va plus tard.
  3. Et modifier les premières lignes de matrix_to_vcf comme:

    def matrix_to_vcf(i):
        matrix_df = global_gen_matrix_df_values[i]
    

maintenant, relisons-le. Libérer de la mémoire:

free

Processus arbre:

process tree

Et sa somme:

sum

ainsi nous sommes à un maximum de ~2,9 GiB d'utilisation de mémoire réelle (le processus principal de pointe a tout en construisant la base de données) et la copie-sur-l'écriture a aidé!

comme note secondaire, il y a ce qu'on appelle copie-sur-lecture, le comportement du collecteur de déchets du cycle de référence de Python, décrit dans Instagram Ingénierie (ce qui a conduit à gc.freeze issue31558). Mais gc.disable() n'a pas d'impact dans ce cas particulier.

mise à Jour

une alternative au partage de données copy-on-write copy-less peut être de le déléguer au noyau dès le début en utilisant numpy.memmap. Ici un exemple de mise en œuvre traitement de données haute Performance en Python parler. partie délicate est ensuite de faire des Pandas pour utiliser le mmaped Numpy array.

7
répondu saaj 2018-09-20 09:16:52

j'ai eu le même problème. J'avais besoin de traiter un énorme corpus de texte tout en conservant une base de connaissances de quelques DataFrames de millions de lignes chargées en mémoire. Je pense que cette question Est commune, donc je vais garder ma réponse orientée à des fins générales.

combinaison paramètres résolu le problème pour moi (1 & 3 & 5 ne peut le faire pour vous):

  1. Utiliser Pool.imap (ou imap_unordered) au lieu de Pool.map. Ceci itérera sur les données paresseusement que charger tout cela en mémoire avant de commencer le traitement.

  2. Définir une valeur chunksize paramètre. Cela rendra imap le plus rapide aussi.

  3. Définir une valeur maxtasksperchild paramètre.

  4. ajouter la sortie au disque qu'en mémoire. Instantanément ou chaque alors quand il atteint une certaine taille.

  5. exécutez le code dans différents lots. Vous pouvez utiliser itertools.islice si vous avez une itérateur. L'idée est de diviser votre list(gen_matrix_df_list.values()) à trois ou plusieurs listes, puis vous passez le premier tiers seulement map ou imap, puis le deuxième tiers dans un autre passage, etc. Puisque vous avez une liste, vous pouvez simplement couper dans la même ligne de code.

3
répondu Abdo 2018-03-24 23:16:27

quand vous utilisez multiprocessing.Pool un certain nombre de processus enfants seront créés en utilisant le fork() appel système. Chacun de ces procédés commencer avec une copie exacte de la mémoire du processus parent à l'époque. Parce que vous chargez le csv avant de créer le Pool de taille 3, chacun de ces 3 processus dans la piscine inutilement avoir une copie de la trame de données. (gen_matrix_df ainsi que gen_matrix_df_list existent dans le processus actuel ainsi que dans chacune des 3 processus enfants, donc en 4 exemplaires chacune de ces structures sera en mémoire)

Essayez de créer le Pool avant de charger le fichier (au tout début en fait), Qui devrait réduire l'utilisation de la mémoire.

Si c'est encore trop élevé, vous pouvez:

  1. Dump gen_matrix_df_list vers un fichier, 1 item par ligne, E. g:

    import os
    import cPickle
    
    with open('tempfile.txt', 'w') as f:
        for item in gen_matrix_df_list.items():
            cPickle.dump(item, f)
            f.write(os.linesep)
    
  2. Utiliser Pool.imap() sur un itérateur sur les lignes que vous sous-évaluées dans ce fichier, par exemple:

    with open('tempfile.txt', 'r') as f:
        p.imap(matrix_to_vcf, (cPickle.loads(line) for line in f))
    

    (noter que matrix_to_vcf prend un (key, value) tuple dans l'exemple ci-dessus, pas juste une valeur)

j'espère que ça aide.

NB: Je n'ai pas testé le code ci-dessus. Il est uniquement destiné à montrer l'idée.

3
répondu tomas 2018-03-26 23:40:42

GENERAL ANSWER ABOUT MEMORY WITH MULTIPROCESSING

vous avez demandé:"Qu'est-ce qui fait que tant de mémoire soit allouée". La réponse repose sur deux parties.

Premier, comme vous l'avez déjà remarqué, multiprocessing travailleur obtienne sa propre copie des données (cité de là), donc vous devriez découper de grands arguments. Ou pour les gros fichiers, lisez-les un peu à la fois, si possible.

par par défaut, les opérateurs de la piscine sont des processus Python réels fourchés utiliser le module multiprocessing de la bibliothèque standard Python lorsque n_jobs != 1. Les arguments transmis en tant qu'input à L'appel parallèle sont: sérialisé et réaffecté dans la mémoire de chaque processus de travail.

Cela peut être problématique pour les grands arguments qu'ils seront réaffectées n_jobs fois par les travailleurs.

Deuxième, si vous essayez de récupérer de la mémoire, vous avez besoin comprendre que python fonctionne différemment des autres langues, et vous comptez sur del pour libérer la mémoire quand ça ne marche pas. Je ne sais pas si c'est mieux, mais dans mon propre code, j'ai surmonté ça en réassignant la variable à un objet nul ou vide.

FOR YOUR SPECIFIC EXAMPLE-MINIMAL CODE EDITING

tant Que vous pouvez adapter à vos données de grande taille dans la mémoire deux fois, je pense que vous pouvez faire ce que vous essayez faire en changeant juste une seule ligne. J'ai écrit un code très similaire et cela a fonctionné pour moi lorsque j'ai réaffecté la variable (vice call del ou tout autre type de collecte des ordures). Si cela ne fonctionne pas, vous pouvez avoir besoin de suivre les suggestions ci-dessus et d'utiliser l'E/S du disque:

    #### earlier code all the same
    # clear memory by reassignment (not del or gc)
    gen_matrix_df = {}

    '''Now, pipe each dataframe from the list using map.Pool() '''
    p = Pool(3)  # number of pool to run at once; default at 1
    result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))

    #del gen_matrix_df_list  # I suspect you don't even need this, memory will free when the pool is closed

    p.close()
    p.join()
    #### later code all the same

FOR YOUR SPECIFIC EXAMPLE-OPTIMAL MEMORY USAGE

tant Que vous pouvez adapter à vos données de grande taille dans la mémoire une fois, et vous aurez une idée de la taille de votre fichier, vous pouvez utiliser Pandas read_csv partielle de la lecture des fichiers, à lire dans seulement nrows à un moment si vous voulez vraiment micro-gérer combien de données est lu dans, ou une [quantité fixe de mémoire à la fois en utilisant chunksize], qui renvoie un itérateur 5. Par cela je veux dire, le paramètre nrows est juste une simple lecture: vous pourriez l'utiliser pour juste obtenir un coup d'oeil à un dossier, ou si pour une raison quelconque vous vouliez que chaque partie ait exactement le même nombre de lignes (parce que, pour exemple, si vos données sont des chaînes de longueur variable, chaque ligne ne prendra pas la même quantité de mémoire). Mais je pense que dans le but de préparer un fichier pour le multiprocessing, il sera beaucoup plus facile d'utiliser des morceaux, parce que cela se rapporte directement à la mémoire, ce qui est votre préoccupation. Il sera plus facile d'utiliser trial & error pour s'adapter à la mémoire basée sur des morceaux de taille spécifique que le nombre de lignes, ce qui changera la quantité d'utilisation de la mémoire en fonction de combien de données se trouve dans les lignes. Le seul autre la partie difficile est que pour une raison spécifique à une application, vous regroupez quelques lignes, donc cela rend juste un peu plus compliqué. À l'aide de votre code comme exemple:

   '''load the genome matrix file onto pandas as dataframe.
    This makes is more easy for multiprocessing'''

    # store the splitted dataframes as list of key, values(pandas dataframe) pairs
    # this list of dataframe will be used while multiprocessing
    #not sure why you need the ordered dict here, might add memory overhead
    #gen_matrix_df_list = collections.OrderedDict()  
    #a defaultdict won't throw an exception when we try to append to it the first time. if you don't want a default dict for some reason, you have to initialize each entry you care about.
    gen_matrix_df_list = collections.defaultdict(list)   
    chunksize = 10 ** 6

    for chunk in pd.read_csv(genome_matrix_file, sep='\t', names=header, chunksize=chunksize)
        # now, group the dataframe by chromosome/contig - so it can be multiprocessed
        gen_matrix_df = chunk.groupby('CHROM')
        for chr_, data in gen_matrix_df:
            gen_matrix_df_list[chr_].append(data)

    '''Having sorted chunks on read to a list of df, now create single data frames for each chr_'''
    #The dict contains a list of small df objects, so now concatenate them
    #by reassigning to the same dict, the memory footprint is not increasing 
    for chr_ in gen_matrix_df_list.keys():
        gen_matrix_df_list[chr_]=pd.concat(gen_matrix_df_list[chr_])

    '''Now, pipe each dataframe from the list using map.Pool() '''
    p = Pool(3)  # number of pool to run at once; default at 1
    result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))
    p.close()
    p.join()
2
répondu Jeff Ellen 2018-03-31 15:25:02