Meilleure façon d'obtenir la valeur max dans une Étincelle dataframe colonne

j'essaie de trouver la meilleure façon d'obtenir la plus grande valeur dans une colonne de base de données Spark.

considérons l'exemple suivant:

df = spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
df.show()

Qui crée:

+---+---+
|  A|  B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+

mon but est de trouver la plus grande valeur dans la colonne A (par inspection, c'est 3.0). En utilisant PySpark, voici quatre approches auxquelles je peux penser:

# Method 1: Use describe()
float(df.describe("A").filter("summary = 'max'").select("A").collect()[0].asDict()['A'])

# Method 2: Use SQL
df.registerTempTable("df_table")
spark.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()['maxval']

# Method 3: Use groupby()
df.groupby().max('A').collect()[0].asDict()['max(A)']

# Method 4: Convert to RDD
df.select("A").rdd.max()[0]

chacune des réponses ci-dessus donne la bonne réponse, mais en l'absence d'un outil de profilage des étincelles Je ne peux pas dire laquelle est la meilleure.

des idées d'intuition ou d'empirisme sur laquelle des méthodes ci-dessus est la plus efficace en termes de temps D'exécution de L'étincelle ou d'utilisation des ressources, ou s'il y a une méthode plus directe que celles ci-dessus?

27
demandé sur Boern 2015-10-20 01:04:26

7 réponses

>df1.show()
+-----+--------------------+--------+----------+-----------+
|floor|           timestamp|     uid|         x|          y|
+-----+--------------------+--------+----------+-----------+
|    1|2014-07-19T16:00:...|600dfbe2| 103.79211|71.50419418|
|    1|2014-07-19T16:00:...|5e7b40e1| 110.33613|100.6828393|
|    1|2014-07-19T16:00:...|285d22e4|110.066315|86.48873585|
|    1|2014-07-19T16:00:...|74d917a1| 103.78499|71.45633073|

>row1 = df1.agg({"x": "max"}).collect()[0]
>print row1
Row(max(x)=110.33613)
>print row1["max(x)"]
110.33613

La réponse est presque le même que method3. mais il semble que le "asDict ()" de la method3 puisse être supprimé

19
répondu Burt 2016-07-12 14:17:54

La valeur Max pour une colonne particulière d'une datagramme peut être obtenue en utilisant -

your_max_value = df.agg({"your-column": "max"}).collect()[0][0]

6
répondu Rudra Prasad Samal 2017-09-11 14:09:25

Remarque: Spark est destiné à travailler sur le Big Data - distributed computing. La taille de L'exemple DataFrame est très petite, de sorte que l'ordre des exemples réels peut être modifié par rapport à l'exemple small~.

Le plus lent: Method_1, parce que .décrire ("A") calcule min, max, mean, stddev, et count (5 calculs sur toute la colonne)

Moyen: Method_4, parce que, .la transformation de la DRD (de la FD à la DRD) ralentit le processus.

Plus Rapide: Method_3 ~ Method_2 ~ method_5, parce que la logique est très similaire, de sorte que L'optimiseur du catalyseur de Spark suit une logique très similaire avec un nombre minimal d'opérations (obtenir max d'une colonne particulière, collecter une base de données de valeur unique); (.asDict () ajoute un peu de temps supplémentaire en comparant 3,2 à 5)

import pandas as pd
import time

time_dict = {}

dfff = self.spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
#--  For bigger/realistic dataframe just uncomment the following 3 lines
#lst = list(np.random.normal(0.0, 100.0, 100000))
#pdf = pd.DataFrame({'A': lst, 'B': lst, 'C': lst, 'D': lst})
#dfff = self.sqlContext.createDataFrame(pdf)

tic1 = int(round(time.time() * 1000))
# Method 1: Use describe()
max_val = float(dfff.describe("A").filter("summary = 'max'").select("A").collect()[0].asDict()['A'])
tac1 = int(round(time.time() * 1000))
time_dict['m1']= tac1 - tic1
print (max_val)

tic2 = int(round(time.time() * 1000))
# Method 2: Use SQL
dfff.registerTempTable("df_table")
max_val = self.sqlContext.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()['maxval']
tac2 = int(round(time.time() * 1000))
time_dict['m2']= tac2 - tic2
print (max_val)

tic3 = int(round(time.time() * 1000))
# Method 3: Use groupby()
max_val = dfff.groupby().max('A').collect()[0].asDict()['max(A)']
tac3 = int(round(time.time() * 1000))
time_dict['m3']= tac3 - tic3
print (max_val)

tic4 = int(round(time.time() * 1000))
# Method 4: Convert to RDD
max_val = dfff.select("A").rdd.max()[0]
tac4 = int(round(time.time() * 1000))
time_dict['m4']= tac4 - tic4
print (max_val)

tic5 = int(round(time.time() * 1000))
# Method 4: Convert to RDD
max_val = dfff.agg({"A": "max"}).collect()[0][0]
tac5 = int(round(time.time() * 1000))
time_dict['m5']= tac5 - tic5
print (max_val)

print time_dict

Résultat sur un bord-nœud d'un cluster en millisecondes (ms):

small DF (ms): {'m1': 7096, 'm2': 205, 'm3': 165, 'm4': 211, 'm5': 180}

plus grand DF (ms): {'m1': 10260, 'm2': 452, 'm3': 465, 'm4': 916,' m5': 373}

3
répondu Danylo Zherebetskyy 2018-02-13 22:10:22

Dans le cas où certaines des merveilles comment le faire en utilisant Scala (à l'aide de Spark 2.0.+), ici vous allez:

scala> df.createOrReplaceTempView("TEMP_DF")
scala> val myMax = spark.sql("SELECT MAX(x) as maxval FROM TEMP_DF").
    collect()(0).getInt(0)
scala> print(myMax)
117
2
répondu Boern 2016-11-22 10:29:34

une Autre façon de faire:

df.select(f.max(f.col("A")).alias("MAX")).limit(1).collect()[0].MAX

sur mes données, j'ai obtenu ce benchmarks:

df.select(f.max(f.col("A")).alias("MAX")).limit(1).collect()[0].MAX
CPU times: user 2.31 ms, sys: 3.31 ms, total: 5.62 ms
Wall time: 3.7 s

df.select("A").rdd.max()[0]
CPU times: user 23.2 ms, sys: 13.9 ms, total: 37.1 ms
Wall time: 10.3 s

df.agg({"A": "max"}).collect()[0][0]
CPU times: user 0 ns, sys: 4.77 ms, total: 4.77 ms
Wall time: 3.75 s

Tous d'entre eux donnent la même réponse

1
répondu luminousmen 2018-09-03 16:30:39

je crois que la meilleure solution sera d'utiliser head()

Compte tenu de votre exemple:

+---+---+

| A|B/

+---+---+

|1.0|4.0|

|2.0|5.0|

|3.0|6.0|

+---+---+

En utilisant la méthode agg et max de python nous pouvons obtenir la valeur suivante :



from pyspark.sql.functions import max df.agg(max(df.A)).head()[0]

retour: 3.0

marque assurez-vous d'avoir le bon importation:

from pyspark.sql.functions import max La fonction max que nous utilisons ici est la fonction de bibliothèque sql de pySPark, et non la fonction max par défaut de python.

0
répondu Vyom Shrivastava 2018-07-17 00:41:13

Voici une façon paresseuse de faire cela, en faisant simplement des statistiques de calcul:

df.write.mode("overwrite").saveAsTable("sampleStats")
Query = "ANALYZE TABLE sampleStats COMPUTE STATISTICS FOR COLUMNS " + ','.join(df.columns)
spark.sql(Query)

df.describe('ColName')

ou

spark.sql("Select * from sampleStats").describe('ColName')

ou vous pouvez ouvrir une ruche shell et

describe formatted table sampleStats;

Vous permettra de voir les statistiques dans les propriétés - min, max, distinctes, les valeurs null, etc.

0
répondu user 923227 2018-09-12 18:58:44