Meilleure façon d'obtenir la valeur max dans une Étincelle dataframe colonne
j'essaie de trouver la meilleure façon d'obtenir la plus grande valeur dans une colonne de base de données Spark.
considérons l'exemple suivant:
df = spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
df.show()
Qui crée:
+---+---+
| A| B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+
mon but est de trouver la plus grande valeur dans la colonne A (par inspection, c'est 3.0). En utilisant PySpark, voici quatre approches auxquelles je peux penser:
# Method 1: Use describe()
float(df.describe("A").filter("summary = 'max'").select("A").collect()[0].asDict()['A'])
# Method 2: Use SQL
df.registerTempTable("df_table")
spark.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()['maxval']
# Method 3: Use groupby()
df.groupby().max('A').collect()[0].asDict()['max(A)']
# Method 4: Convert to RDD
df.select("A").rdd.max()[0]
chacune des réponses ci-dessus donne la bonne réponse, mais en l'absence d'un outil de profilage des étincelles Je ne peux pas dire laquelle est la meilleure.
des idées d'intuition ou d'empirisme sur laquelle des méthodes ci-dessus est la plus efficace en termes de temps D'exécution de L'étincelle ou d'utilisation des ressources, ou s'il y a une méthode plus directe que celles ci-dessus?
7 réponses
>df1.show()
+-----+--------------------+--------+----------+-----------+
|floor| timestamp| uid| x| y|
+-----+--------------------+--------+----------+-----------+
| 1|2014-07-19T16:00:...|600dfbe2| 103.79211|71.50419418|
| 1|2014-07-19T16:00:...|5e7b40e1| 110.33613|100.6828393|
| 1|2014-07-19T16:00:...|285d22e4|110.066315|86.48873585|
| 1|2014-07-19T16:00:...|74d917a1| 103.78499|71.45633073|
>row1 = df1.agg({"x": "max"}).collect()[0]
>print row1
Row(max(x)=110.33613)
>print row1["max(x)"]
110.33613
La réponse est presque le même que method3. mais il semble que le "asDict ()" de la method3 puisse être supprimé
La valeur Max pour une colonne particulière d'une datagramme peut être obtenue en utilisant -
your_max_value = df.agg({"your-column": "max"}).collect()[0][0]
Remarque: Spark est destiné à travailler sur le Big Data - distributed computing. La taille de L'exemple DataFrame est très petite, de sorte que l'ordre des exemples réels peut être modifié par rapport à l'exemple small~.
Le plus lent: Method_1, parce que .décrire ("A") calcule min, max, mean, stddev, et count (5 calculs sur toute la colonne)
Moyen: Method_4, parce que, .la transformation de la DRD (de la FD à la DRD) ralentit le processus.
Plus Rapide: Method_3 ~ Method_2 ~ method_5, parce que la logique est très similaire, de sorte que L'optimiseur du catalyseur de Spark suit une logique très similaire avec un nombre minimal d'opérations (obtenir max d'une colonne particulière, collecter une base de données de valeur unique); (.asDict () ajoute un peu de temps supplémentaire en comparant 3,2 à 5)
import pandas as pd
import time
time_dict = {}
dfff = self.spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
#-- For bigger/realistic dataframe just uncomment the following 3 lines
#lst = list(np.random.normal(0.0, 100.0, 100000))
#pdf = pd.DataFrame({'A': lst, 'B': lst, 'C': lst, 'D': lst})
#dfff = self.sqlContext.createDataFrame(pdf)
tic1 = int(round(time.time() * 1000))
# Method 1: Use describe()
max_val = float(dfff.describe("A").filter("summary = 'max'").select("A").collect()[0].asDict()['A'])
tac1 = int(round(time.time() * 1000))
time_dict['m1']= tac1 - tic1
print (max_val)
tic2 = int(round(time.time() * 1000))
# Method 2: Use SQL
dfff.registerTempTable("df_table")
max_val = self.sqlContext.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()['maxval']
tac2 = int(round(time.time() * 1000))
time_dict['m2']= tac2 - tic2
print (max_val)
tic3 = int(round(time.time() * 1000))
# Method 3: Use groupby()
max_val = dfff.groupby().max('A').collect()[0].asDict()['max(A)']
tac3 = int(round(time.time() * 1000))
time_dict['m3']= tac3 - tic3
print (max_val)
tic4 = int(round(time.time() * 1000))
# Method 4: Convert to RDD
max_val = dfff.select("A").rdd.max()[0]
tac4 = int(round(time.time() * 1000))
time_dict['m4']= tac4 - tic4
print (max_val)
tic5 = int(round(time.time() * 1000))
# Method 4: Convert to RDD
max_val = dfff.agg({"A": "max"}).collect()[0][0]
tac5 = int(round(time.time() * 1000))
time_dict['m5']= tac5 - tic5
print (max_val)
print time_dict
Résultat sur un bord-nœud d'un cluster en millisecondes (ms):
small DF (ms): {'m1': 7096, 'm2': 205, 'm3': 165, 'm4': 211, 'm5': 180}
plus grand DF (ms): {'m1': 10260, 'm2': 452, 'm3': 465, 'm4': 916,' m5': 373}
Dans le cas où certaines des merveilles comment le faire en utilisant Scala (à l'aide de Spark 2.0.+), ici vous allez:
scala> df.createOrReplaceTempView("TEMP_DF")
scala> val myMax = spark.sql("SELECT MAX(x) as maxval FROM TEMP_DF").
collect()(0).getInt(0)
scala> print(myMax)
117
une Autre façon de faire:
df.select(f.max(f.col("A")).alias("MAX")).limit(1).collect()[0].MAX
sur mes données, j'ai obtenu ce benchmarks:
df.select(f.max(f.col("A")).alias("MAX")).limit(1).collect()[0].MAX
CPU times: user 2.31 ms, sys: 3.31 ms, total: 5.62 ms
Wall time: 3.7 s
df.select("A").rdd.max()[0]
CPU times: user 23.2 ms, sys: 13.9 ms, total: 37.1 ms
Wall time: 10.3 s
df.agg({"A": "max"}).collect()[0][0]
CPU times: user 0 ns, sys: 4.77 ms, total: 4.77 ms
Wall time: 3.75 s
Tous d'entre eux donnent la même réponse
je crois que la meilleure solution sera d'utiliser head()
Compte tenu de votre exemple:
+---+---+
| A|B/
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+
En utilisant la méthode agg et max de python nous pouvons obtenir la valeur suivante :from pyspark.sql.functions import max
df.agg(max(df.A)).head()[0]
retour:
3.0
marque assurez-vous d'avoir le bon importation:from pyspark.sql.functions import max
La fonction max que nous utilisons ici est la fonction de bibliothèque sql de pySPark, et non la fonction max par défaut de python.
Voici une façon paresseuse de faire cela, en faisant simplement des statistiques de calcul:
df.write.mode("overwrite").saveAsTable("sampleStats")
Query = "ANALYZE TABLE sampleStats COMPUTE STATISTICS FOR COLUMNS " + ','.join(df.columns)
spark.sql(Query)
df.describe('ColName')
ou
spark.sql("Select * from sampleStats").describe('ColName')
ou vous pouvez ouvrir une ruche shell et
describe formatted table sampleStats;
Vous permettra de voir les statistiques dans les propriétés - min, max, distinctes, les valeurs null, etc.