Flux de travail "données importantes" à l'aide de pandas
j'ai essayé de trouver une réponse à cette question pendant de nombreux mois tout en apprenant les pandas. J'utilise SAS pour mon travail quotidien et c'est génial car c'est un support de base. Cependant, SAS est un logiciel horrible pour de nombreuses autres raisons.
un jour, j'espère remplacer mon utilisation de SAS par python et pandas, mais je manque actuellement d'un workflow hors-noyau pour les grands ensembles de données. Je ne parle pas de "big data", qui nécessite un réseau distribué, mais plutôt fichiers trop volumineux pour tenir dans la mémoire, mais assez petit pour tenir sur un disque dur.
ma première pensée est d'utiliser HDFStore
pour maintenir les grands ensembles de données sur le disque et tirer seulement les morceaux dont j'ai besoin dans les images de données pour l'analyse. D'autres ont mentionné le MongoDB comme une alternative plus facile à utiliser. Ma question Est la suivante:
Quelles sont certaines des meilleures pratiques de flux de travail pour l'accomplissement de ce qui suit:
- chargement de fichiers plats dans un permanente, sur disque structure de base de données
- interrogation de cette base de données pour récupérer des données pour alimenter une structure de données pandas
- mise à Jour de la base de données après la manipulation de pièces dans les pandas
les exemples du monde réel seraient très appréciés, en particulier de la part de quiconque utilise pandas sur les "données volumineuses".
éditer -- un exemple de la façon dont je voudrais que cela fonctionne:
- Importer itérativement un grand fichier plat et le stocker dans une structure de base de données permanente sur disque. Ces fichiers sont généralement trop volumineux pour être stockés en mémoire.
- afin d'utiliser les Pandas, je voudrais lire des sous-ensembles de ces données (habituellement quelques colonnes à la fois) qui peut s'adapter à la mémoire.
- je créerais de nouvelles colonnes en effectuant diverses opérations sur les colonnes sélectionnées.
- je devrais alors ajouter ces nouvelles colonnes en la structure de base de données.
j'essaie de trouver un moyen pour effectuer ces étapes. Lire les liens sur pandas et pytables il semble que l'ajout d'une nouvelle colonne pourrait être un problème.
éditer -- répondre aux questions de Jeff spécifiquement:
- je suis bâtiment consommateur modèles de risque de crédit. Les types de données comprennent les caractéristiques du téléphone, du réseau social et de l'adresse; la valeur des propriétés; les caractéristiques dérogatoires. information comme les casiers judiciaires, les faillites, etc... Les ensembles de données que j'utilise chaque jour comportent en moyenne de 1 000 à 2 000 champs de données mixtes: variables continues, nominales et ordinales de données numériques et de caractères. J'ajoute rarement des lignes, mais j'effectue de nombreuses opérations qui créent de nouvelles colonnes.
- opérations typiques impliquent la combinaison de plusieurs colonnes utilisant la logique conditionnelle dans une nouvelle colonne composée. Par exemple,
if var1 > 2 then newvar = 'A' elif var2 = 4 then newvar = 'B'
. Le résultat de ces opérations est une nouvelle colonne pour chaque enregistrement dans mon ensemble de données. - enfin, j'aimerais ajouter ces nouvelles colonnes à la structure de données sur disque. Je répéterai l'étape 2, en explorant les données à l'aide de tableaux croisés et de statistiques descriptives, en essayant de trouver des relations intuitives et intéressantes avec le modèle.
- un fichier de projet typique est habituellement d'environ 1 Go. Les fichiers sont organisés de telle manière qu'une rangée consiste en un enregistrement de données sur les consommateurs. Chaque ligne a le même nombre de colonnes pour chaque enregistrement. Ce sera toujours le cas.
- c'est assez rare que je fasse un sous-ensemble par lignes lors de la création d'une nouvelle colonne. Cependant, il est assez commun pour moi de sous-ensemble sur les lignes lors de la création de rapports ou de générer des statistiques descriptives. Par exemple, je pourrais vouloir créer une fréquence simple pour un secteur d'activité spécifique, disons les cartes de crédit de détail. Pour ce faire, je choisirais seulement les documents où le secteur d'activité = commerce de détail en plus de selon les colonnes je tiens à signaler sur. Lors de la création de nouvelles colonnes, cependant, je tirerais toutes les lignes de données et seulement les colonnes dont j'ai besoin pour les opérations.
- le processus de modélisation exige que j'analyse chaque colonne, que je cherche des relations intéressantes avec une variable de résultat, et de créer de nouvelles colonnes composées qui décrivent ces relations. Les colonnes que j'explore sont habituellement faites en petits ensembles. Par exemple, je vais me concentrer sur un ensemble de 20 colonnes affaire avec la valeur de la propriété et d'observer comment ils se rapportent à défaut sur un prêt. Une fois que ces colonnes sont explorées et que de nouvelles colonnes sont créées, je passe à un autre groupe de colonnes, disons l'enseignement collégial, et je répète le processus. Ce que je fais, c'est créer des variables candidates qui expliquent la relation entre mes données et certains résultats. À la fin de ce processus, j'applique certaines techniques d'apprentissage qui créent une équation de ceux composés de colonnes.
It il est rare que j'ajoute des lignes à l'ensemble de données. Je vais presque toujours créer de nouvelles colonnes (variables ou caractéristiques dans le langage statistique/machine learning).
13 réponses
j'utilise régulièrement plusieurs dizaines de gigaoctets de données dans ce mode par exemple, j'ai des tables sur le disque que je lis via des requêtes, créer des données et ajouter en arrière.
il est intéressant de lire les docs et tard dans ce fil pour plusieurs suggestions sur la façon de stocker vos données.
détails qui affecteront la façon dont vous stockez vos données, comme:
donnez autant de détails que vous peut; et je peux vous aider à mettre en place une structure.
- taille des données, nombre de lignes, colonnes, types de colonnes; annexez-vous des rangées, ou juste des colonnes?
- à quoi ressembleront les opérations typiques. E. g. faire une requête sur les colonnes pour sélectionner un groupe de lignes et de colonnes spécifiques, puis faire une opération (en mémoire), créer de nouvelles colonnes, enregistrer ces.
(Pour donner un exemple jouet pourrait nous permettre d'offrir plus spécifiques recommandation.) - après ce traitement, Que faites-vous? L'étape 2 est-elle ponctuelle ou répétitive?
- Entrée plat des fichiers: combien d', bruts total taille en Go. Comment sont-ils organisés par exemple par des enregistrements? Chacun contient différents champs, ou ont-ils certains enregistrements par fichier avec tous les champs de chaque fichier?
- avez-vous déjà sélectionné des sous-ensembles de lignes (enregistrements) en fonction de critères (par exemple, sélectionnez les lignes avec le champ a > 5)? et alors, faites quelque chose, ou choisissez-vous juste les champs A, B, C avec tous les enregistrements (et ensuite faites quelque chose)?
- est-ce que vous "travaillez" sur toutes vos colonnes (en groupes), ou y a-t-il une bonne proportion que vous ne pouvez utiliser que pour les rapports (par exemple, vous voulez garder les données, mais vous n'avez pas besoin de tirer dans cette colonne explicité jusqu'au temps des résultats finaux)?
Solution
assurez-vous d'avoir pandas à minimum 0.10.1
installé.
Lire itération fichiers de morceau par morceau et plusieurs requêtes de table .
puisque pytables est optimisé pour fonctionner en ligne (ce qui est ce sur quoi vous posez des questions), nous allons créer une table pour chaque groupe de champs. De cette façon, il est facile de sélectionner un petit groupe de champs (qui fonctionne avec une grande table, mais il est plus efficace de faire de cette façon... Je je pense pouvoir corriger cette limitation dans le futur... c'est plus intuitif de toute façon):
(Ce qui suit est pseudocode.)
import numpy as np
import pandas as pd
# create a store
store = pd.HDFStore('mystore.h5')
# this is the key to your storage:
# this maps your fields to a specific group, and defines
# what you want to have as data_columns.
# you might want to create a nice class wrapping this
# (as you will want to have this map and its inversion)
group_map = dict(
A = dict(fields = ['field_1','field_2',.....], dc = ['field_1',....,'field_5']),
B = dict(fields = ['field_10',...... ], dc = ['field_10']),
.....
REPORTING_ONLY = dict(fields = ['field_1000','field_1001',...], dc = []),
)
group_map_inverted = dict()
for g, v in group_map.items():
group_map_inverted.update(dict([ (f,g) for f in v['fields'] ]))
lire dans les fichiers et créer le stockage (essentiellement faire ce que append_to_multiple
fait):
for f in files:
# read in the file, additional options hmay be necessary here
# the chunksize is not strictly necessary, you may be able to slurp each
# file into memory in which case just eliminate this part of the loop
# (you can also change chunksize if necessary)
for chunk in pd.read_table(f, chunksize=50000):
# we are going to append to each table by group
# we are not going to create indexes at this time
# but we *ARE* going to create (some) data_columns
# figure out the field groupings
for g, v in group_map.items():
# create the frame for this group
frame = chunk.reindex(columns = v['fields'], copy = False)
# append it
store.append(g, frame, index=False, data_columns = v['dc'])
Maintenant vous avez toutes les tables dans le fichier (en fait vous pouvez les stocker dans des fichiers séparés si vous le souhaitez, vous prob devrait ajouter le nom de fichier à group_map, mais probablement ce n'est pas nécessaire).
C'est ainsi que vous obtenez des colonnes et en créez de nouvelles:
frame = store.select(group_that_I_want)
# you can optionally specify:
# columns = a list of the columns IN THAT GROUP (if you wanted to
# select only say 3 out of the 20 columns in this sub-table)
# and a where clause if you want a subset of the rows
# do calculations on this frame
new_frame = cool_function_on_frame(frame)
# to 'add columns', create a new group (you probably want to
# limit the columns in this new_group to be only NEW ones
# (e.g. so you don't overlap from the other tables)
# add this info to the group_map
store.append(new_group, new_frame.reindex(columns = new_columns_created, copy = False), data_columns = new_columns_created)
Lorsque vous êtes prêt pour post_processing:
# This may be a bit tricky; and depends what you are actually doing.
# I may need to modify this function to be a bit more general:
report_data = store.select_as_multiple([groups_1,groups_2,.....], where =['field_1>0', 'field_1000=foo'], selector = group_1)
à propos de data_columns, vous n'avez pas besoin de définir un data_columns; ils vous permettent de sous-sélectionner des lignes basées sur la colonne. Par exemple: quelque chose comme:
store.select(group, where = ['field_1000=foo', 'field_1001>0'])
ils peuvent être les plus intéressants pour vous dans la finale étape de production du rapport (essentiellement, une colonne de données est séparée des autres colonnes, ce qui pourrait avoir une certaine incidence sur l'efficience si vous définissez un lot).
Vous pourriez aussi vouloir:
- créer une fonction qui prend une liste de champs, recherche les groupes dans groups_map, puis sélectionne ces derniers et concaténate les résultats de sorte que vous obtenez le cadre résultant (c'est essentiellement ce que select_as_multiple fait). de cette façon la structure serait assez transparente pour vous.
- indexes sur certaines colonnes de données (rend row-subsetting beaucoup plus rapide).
- permet la compression.
dites-moi quand vous avez des questions!
je pense que les réponses ci-dessus manquent une approche simple que j'ai trouvé très utile.
quand j'ai un fichier trop gros pour être chargé en mémoire, je le décompose en plusieurs fichiers plus petits (par ligne ou cols)
exemple: en cas de valeur de 30 jours de données de trading de taille ~30Go, je le brise dans un fichier par jour de taille ~1Go. Par la suite, je traite chaque dossier séparément et je additionne les résultats à la fin de la période 151910920."
L'un des plus grands avantages est qu'il permet le traitement parallèle des fichiers (plusieurs threads ou processus)
l'autre avantage est que la manipulation de fichiers (comme Ajouter/Supprimer des dates dans l'exemple) peut être effectuée par des commandes shell régulières, ce qui n'est pas possible dans des formats de fichiers plus avancés/compliqués
cette approche ne couvre pas tous les scénarios, mais est très utile dans un grand nombre d'entre eux
si vos ensembles de données sont entre 1 et 20 Go, vous devriez obtenir une station de travail avec 48 Go de RAM. Pandas peut alors contenir L'ensemble des données en mémoire vive. Je sais que ce n'est pas la réponse que vous cherchez ici, mais faire de l'informatique scientifique sur un ordinateur portable avec 4 Go de mémoire vive n'est pas raisonnable.
je sais que c'est un vieux fil, mais je pense que la bibliothèque Blaze mérite d'être vérifiée. C'est fait pour ce genre de situations.
à Partir de la documentation:
Blaze étend L'utilisabilité de NumPy et Pandas au calcul distribué et Hors-noyau. Blaze fournit une interface similaire à celle du DataFrame numpy ND-Array ou Pandas, mais établit une correspondance entre ces interfaces familières et une variété d'autres interfaces. des moteurs de calcul comme Postgres ou Spark.
Edit: Par ailleurs, il est pris en charge par ContinuumIO et Travis Oliphant, auteur de NumPy.
C'est le cas de pymongo. J'ai aussi prototypé en utilisant sql server, sqlite, HDF, ORM (SQLAlchemy) en python. D'abord et avant tout pymongo est un DB basé sur un document, donc chaque personne serait un document ( dict
d'attributs). Beaucoup de gens forment une collection et vous pouvez avoir beaucoup de collections (personnes, bourse, revenu).
pd.dateframe - > pymongo Note: j'utilise le chunksize
dans read_csv
pour le conserver dans des enregistrements de 5 à 10k(pymongo laisse tomber la socket si en plus grand)
aCollection.insert((a[1].to_dict() for a in df.iterrows()))
l'interrogation: gt = supérieur...
pd.DataFrame(list(mongoCollection.find({'anAttribute':{'$gt':2887000, '$lt':2889000}})))
.find()
retourne un itérateur donc j'utilise couramment ichunked
pour couper en itérateurs plus petits.
Que Diriez-vous d'une jointure puisque j'obtiens Normalement 10 sources de données à coller ensemble:
aJoinDF = pandas.DataFrame(list(mongoCollection.find({'anAttribute':{'$in':Att_Keys}})))
puis (dans mon cas je dois parfois agg sur aJoinDF
d'abord avant son "fusible".)
df = pandas.merge(df, aJoinDF, on=aKey, how='left')
et vous pouvez alors écrire les nouvelles informations à votre collection principale via la méthode de mise à jour ci-dessous. (collecte logique vs sources de données physiques).
collection.update({primarykey:foo},{key:change})
sur les plus petits lookups, juste dénormaliser. Par exemple, vous avez du code dans le document et vous n'avez qu'à ajouter le texte du code de zone et faire une recherche dict
pendant que vous créez des documents.
Maintenant vous avez un bel ensemble de données basées sur une personne, vous pouvez libérer votre logique sur chaque cas et faire plus d'attributs. Enfin, vous pouvez lire dans pandas votre 3 à la mémoire max indicateurs clés et faire pivots / agg / exploration de données. Cela fonctionne pour moi, pour 3 millions d'enregistrements avec des nombres/texte/catégories/codes/flotteurs/...
vous pouvez également utiliser les deux méthodes intégrées dans MongoDB (MapReduce et aggregate framework). voir ici pour plus d'informations sur le cadre agrégé , car il semble être plus facile que MapReduce et semble pratique pour le travail agrégé rapide. Notez que je n'avais pas besoin de définir mes domaines ou relations, et je peux ajouter des éléments à un document. À l'état actuel de l'évolution rapide de numpy, pandas, Python toolset, MongoDB m'aide juste à me mettre au travail:)
j'ai repéré cela un peu tard, mais je travaille avec un problème similaire (modèles de remboursement anticipé d'hypothèque). Ma solution a été de sauter la couche pandas HDFStore et d'utiliser des pytables droits. Je sauve chaque colonne comme un tableau HDF5 individuel dans mon fichier final.
mon workflow de base est d'abord d'obtenir un fichier CSV à partir de la base de données. Je l'ai ouvert, donc il n'est pas aussi énorme. Puis je le convertis en un fichier HDF5 orienté ligne, en itérant dessus en python, en convertissant chaque ligne en données réelles taper, et l'écrire dans un fichier HDF5. Cela prend quelques dizaines de minutes, mais il n'utilise pas de mémoire, car il ne fonctionne que ligne par ligne. Puis j'ai "transposé" le fichier HDF5 orienté ligne dans un fichier HDF5 orienté colonne.
le tableau transpose ressemble à:
def transpose_table(h_in, table_path, h_out, group_name="data", group_path="/"):
# Get a reference to the input data.
tb = h_in.getNode(table_path)
# Create the output group to hold the columns.
grp = h_out.createGroup(group_path, group_name, filters=tables.Filters(complevel=1))
for col_name in tb.colnames:
logger.debug("Processing %s", col_name)
# Get the data.
col_data = tb.col(col_name)
# Create the output array.
arr = h_out.createCArray(grp,
col_name,
tables.Atom.from_dtype(col_data.dtype),
col_data.shape)
# Store the data.
arr[:] = col_data
h_out.flush()
la relire ressemble alors à:
def read_hdf5(hdf5_path, group_path="/data", columns=None):
"""Read a transposed data set from a HDF5 file."""
if isinstance(hdf5_path, tables.file.File):
hf = hdf5_path
else:
hf = tables.openFile(hdf5_path)
grp = hf.getNode(group_path)
if columns is None:
data = [(child.name, child[:]) for child in grp]
else:
data = [(child.name, child[:]) for child in grp if child.name in columns]
# Convert any float32 columns to float64 for processing.
for i in range(len(data)):
name, vec = data[i]
if vec.dtype == np.float32:
data[i] = (name, vec.astype(np.float64))
if not isinstance(hdf5_path, tables.file.File):
hf.close()
return pd.DataFrame.from_items(data)
maintenant, je cours généralement sur une machine avec une tonne de mémoire, donc je ne pourrais pas être assez prudent avec mon l'utilisation de la mémoire. Par exemple, par défaut, l'opération de chargement lit l'ensemble du jeu de données.
cela fonctionne généralement pour moi, mais c'est un peu lourd, et je ne peux pas utiliser la magie pytables Fantaisie.
Edit: le véritable avantage de cette approche, par rapport au tableau-of-records pytables par défaut, est que je peux ensuite charger les données dans R en utilisant h5r, qui ne peut pas gérer les tables. Ou, du moins, je n'ai pas pu l'amener à charger des tables hétérogènes.
une autre variation
beaucoup des opérations effectuées dans les pandas peuvent aussi être faites comme une requête db (sql, mongo)
L'utilisation D'un RDBMS ou mongodb vous permet d'effectuer certaines agrégations dans la requête DB (qui est optimisée pour les grandes données, et utilise le cache et les index efficacement)
plus tard, vous pouvez effectuer le post-traitement en utilisant pandas.
l'avantage de cette méthode est que vous gagnez la PD optimisations pour travailler avec de grandes données, tout en définissant la logique dans une syntaxe déclarative de haut niveau - et ne pas avoir à traiter les détails de décider ce qu'il faut faire en mémoire et ce qu'il faut faire hors du noyau.
et bien que le langage de requête et les pandas soient différents, il n'est généralement pas compliqué de traduire une partie de la logique de l'un à l'autre.
Un truc que j'ai trouvé utile pour les "grandes données" cas d'utilisation est de réduire le volume des données en réduisant la précision de 32 bits. Il n'est pas applicable dans tous les cas, mais dans de nombreuses applications, la précision 64 bits est excessive et les économies de mémoire 2x en valent la peine. Pour rendre un point évident encore plus évident:
>>> df = pd.DataFrame(np.random.randn(int(1e8), 5))
>>> df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float64(5)
memory usage: 3.7 GB
>>> df.astype(np.float32).info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float32(5)
memory usage: 1.9 GB
tel que noté par d'autres, après quelques années un équivalent pandas" hors-de-cœur "a émergé: dask . Bien que DASK ne soit pas un remplacement drop-in de pandas et de toutes ses fonctionnalités, il se démarque pour plusieurs raisons:
Dask est une bibliothèque de calcul parallèle flexible pour le calcul analytique qui est optimisé pour la planification dynamique des tâches pour les charges de travail computationnelles interactives de Les collections de "Big Data" comme les tableaux parallèles, les images de données et les listes qui étendent les interfaces communes comme les itérateurs NumPy, Pandas ou Python à des environnements plus grands que la mémoire ou distribués et les échelles des ordinateurs portables aux clusters.
DASK souligne les vertus suivantes:
- Familier: Fournit parallélisée tableau NumPy et les Pandas DataFrame objets
- Flexible: fournit une interface de planification des tâches pour des charges de travail plus personnalisées et l'intégration avec d'autres projet.
- natif: permet le calcul distribué en Python pur avec accès à la pile PyData.
- Fast: fonctionne avec une faible charge, une faible latence, et une sérialisation minimale nécessaire pour les Algorithmes numériques rapides
- Echelles vers le haut: fonctionne avec résilience sur les grappes avec des milliers de noyaux Echelles vers le bas: Trivial pour mettre en place et exécuter sur un ordinateur portable dans un seul processus
- Responsive: Conçu avec l'informatique interactive à l'esprit, il fournit une rétroaction rapide et des diagnostics pour aider les humains
et pour ajouter un exemple de code simple:
import dask.dataframe as dd
df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean().compute()
remplace certains pandas comme celui-ci:
import pandas as pd
df = pd.read_csv('2015-01-01.csv')
df.groupby(df.user_id).value.mean()
et, surtout digne de mention, fournit à travers le concurrent.les contrats à terme de l'interface générale pour la soumission de tâches personnalisées:
from dask.distributed import Client
client = Client('scheduler:port')
futures = []
for fn in filenames:
future = client.submit(load, fn)
futures.append(future)
summary = client.submit(summarize, futures)
summary.result()
prendre en considération Ruffus si vous allez à la simple voie de la création d'un pipeline de données, qui est divisé en plusieurs petits fichiers.
il est intéressant de mentionner ici rayon ainsi,
c'est un cadre de calcul distribué, qui a sa propre implémentation pour pandas d'une manière distribuée.
il suffit de remplacer l'importation pandas, et le code devrait fonctionner comme il est:
# import pandas as pd
import ray.dataframe as pd
#use pd as usual
peut lire plus de détails ici:
j'ai récemment rencontré un problème similaire. J'ai simplement trouvé que lire les données en morceaux et les ajouter comme je l'écris en morceaux à la même csv fonctionne bien. Mon problème était d'ajouter une colonne date basée sur des informations dans un autre tableau, en utilisant la valeur de certaines colonnes comme suit. Cela peut aider ceux confondus par dask et hdf5, mais plus familier avec les pandas comme moi.
def addDateColumn():
"""Adds time to the daily rainfall data. Reads the csv as chunks of 100k
rows at a time and outputs them, appending as needed, to a single csv.
Uses the column of the raster names to get the date.
"""
df = pd.read_csv(pathlist[1]+"CHIRPS_tanz.csv", iterator=True,
chunksize=100000) #read csv file as 100k chunks
'''Do some stuff'''
count = 1 #for indexing item in time list
for chunk in df: #for each 100k rows
newtime = [] #empty list to append repeating times for different rows
toiterate = chunk[chunk.columns[2]] #ID of raster nums to base time
while count <= toiterate.max():
for i in toiterate:
if i ==count:
newtime.append(newyears[count])
count+=1
print "Finished", str(chunknum), "chunks"
chunk["time"] = newtime #create new column in dataframe based on time
outname = "CHIRPS_tanz_time2.csv"
#append each output to same csv, using no header
chunk.to_csv(pathlist[2]+outname, mode='a', header=None, index=None)