Compter le nombre d'entrées non-NaN dans chaque colonne de Spark dataframe avec Pyspark
j'ai un très grand ensemble de données qui est chargé dans la ruche. Il se compose d'environ 1,9 millions de lignes et 1450 colonnes. Je dois déterminer la" couverture " de chacune des colonnes, c'est-à-dire la fraction de lignes qui ont des valeurs non-NaN pour chaque colonne.
Voici mon code:
from pyspark import SparkContext
from pyspark.sql import HiveContext
import string as string
sc = SparkContext(appName="compute_coverages") ## Create the context
sqlContext = HiveContext(sc)
df = sqlContext.sql("select * from data_table")
nrows_tot = df.count()
covgs=sc.parallelize(df.columns)
.map(lambda x: str(x))
.map(lambda x: (x, float(df.select(x).dropna().count()) / float(nrows_tot) * 100.))
essayer ceci dans le shell pyspark, si je fais alors covgs.take (10), il renvoie une pile d'erreurs assez grande. Il dit qu'il y a un problème dans enregistrer dans le fichier /usr/lib64/python2.6/pickle.py
. C'est la dernière une partie de l'erreur:
py4j.protocol.Py4JError: An error occurred while calling o37.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
at py4j.Gateway.invoke(Gateway.java:252)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
S'il y a une meilleure façon d'accomplir ceci que celle que j'essaie, je suis ouvert aux suggestions. Je ne peux pas utiliser pandas, cependant, car il n'est pas actuellement disponible sur le cluster sur lequel je travaille et je n'ai pas le droit de l'installer.
1 réponses
commençons avec un mannequin de données:
from pyspark.sql import Row
row = Row("v", "x", "y", "z")
df = sc.parallelize([
row(0.0, 1, 2, 3.0), row(None, 3, 4, 5.0),
row(None, None, 6, 7.0), row(float("Nan"), 8, 9, float("NaN"))
]).toDF()
## +----+----+---+---+
## | v| x| y| z|
## +----+----+---+---+
## | 0.0| 1| 2|3.0|
## |null| 3| 4|5.0|
## |null|null| 6|7.0|
## | NaN| 8| 9|NaN|
## +----+----+---+---+
Tous vous avez besoin est d'une simple agrégation:
from pyspark.sql.functions import col, count, isnan, lit, sum
def count_not_null(c, nan_as_null=False):
"""Use conversion between boolean and integer
- False -> 0
- True -> 1
"""
pred = col(c).isNotNull() & (~isnan(c) if nan_as_null else lit(True))
return sum(pred.cast("integer")).alias(c)
df.agg(*[count_not_null(c) for c in df.columns]).show()
## +---+---+---+---+
## | v| x| y| z|
## +---+---+---+---+
## | 2| 3| 4| 4|
## +---+---+---+---+
ou si vous voulez traiter NaN
NULL
:
df.agg(*[count_not_null(c, True) for c in df.columns]).show()
## +---+---+---+---+
## | v| x| y| z|
## +---+---+---+---+
## | 1| 3| 4| 3|
## +---+---+---+---
vous pouvez aussi utiliser SQL NULL
sémantique pour obtenir le même résultat sans création d'une fonction personnalisée:
df.agg(*[
count(c).alias(c) # vertical (column-wise) operations in SQL ignore NULLs
for c in df.columns
]).show()
## +---+---+---+
## | x| y| z|
## +---+---+---+
## | 1| 2| 3|
## +---+---+---+
mais cela ne fonctionne pas avec NaNs
.
Si vous préférez les fractions:
exprs = [(count_not_null(c) / count("*")).alias(c) for c in df.columns]
df.agg(*exprs).show()
## +------------------+------------------+---+
## | x| y| z|
## +------------------+------------------+---+
## |0.3333333333333333|0.6666666666666666|1.0|
## +------------------+------------------+---+
ou
# COUNT(*) is equivalent to COUNT(1) so NULLs won't be an issue
df.select(*[(count(c) / count("*")).alias(c) for c in df.columns]).show()
## +------------------+------------------+---+
## | x| y| z|
## +------------------+------------------+---+
## |0.3333333333333333|0.6666666666666666|1.0|
## +------------------+------------------+---+