Supprimer les doublons des lignes basées sur des colonnes spécifiques dans une base de données RDD/Spark
disons que j'ai un assez vaste ensemble de données sous la forme suivante:
data = sc.parallelize([('Foo',41,'US',3),
('Foo',39,'UK',1),
('Bar',57,'CA',2),
('Bar',72,'CA',2),
('Baz',22,'US',6),
('Baz',36,'US',6)])
ce que je voudrais faire est de supprimer les lignes dupliquées basées sur les valeurs des première,troisième et quatrième colonnes seulement.
supprimer entièrement les lignes dupliquées est simple:
data = data.distinct()
et la ligne 5 ou ligne 6 sera supprimé
mais comment puis-je supprimer seulement les lignes dupliquées basées sur les colonnes 1, 3 et 4 seulement? c'est à dire supprimer soit un des les présentes:
('Baz',22,'US',6)
('Baz',36,'US',6)
en Python, cela peut être fait en spécifiant des colonnes avec .drop_duplicates()
. Comment puis-je obtenir la même chose dans Spark/Pyspark?
7 réponses
Pyspark inclure un dropDuplicates()
méthode. https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates
>>> from pyspark.sql import Row
>>> df = sc.parallelize([ \
... Row(name='Alice', age=5, height=80), \
... Row(name='Alice', age=5, height=80), \
... Row(name='Alice', age=10, height=80)]).toDF()
>>> df.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 5| 80|Alice|
| 10| 80|Alice|
+---+------+-----+
>>> df.dropDuplicates(['name', 'height']).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 5| 80|Alice|
+---+------+-----+
peut-être qu'il a été introduit dans une version plus tardive que celle utilisée par @Jason (OP)?
edit: ouais, il a été introduit dans la version 1.4
D'après votre question, il n'est pas clair à quelles colonnes vous voulez utiliser pour déterminer les doublons. L'idée générale derrière la solution est de créer une clé basée sur les valeurs des colonnes qui permettent d'identifier les doublons. Ensuite, vous pouvez utiliser la reduceByKey ou réduire les opérations pour éliminer les doublons.
Voici le code:
def get_key(x):
return "{0}{1}{2}".format(x[0],x[2],x[3])
m = data.map(lambda x: (get_key(x),x))
Maintenant, vous avez une clé-valeur RDD
qui est saisi par les colonnes 1, 3 et 4.
La prochaine étape serait un reduceByKey
ou groupByKey
et filter
.
Cela éliminerait les doublons.
r = m.reduceByKey(lambda x,y: (x))
D'accord avec David. Pour ajouter, il pas être le cas que nous voulons groupe toutes les colonnes autres que celles de la fonction agrégée I. e, si nous voulons supprimer les doublons purement basés sur un sous-ensemble de colonnes et conserver toutes les colonnes dans le datagramme original. Donc la meilleure façon de le faire pourrait être d'utiliser dropDuplicates api Dataframe disponible dans Spark 1.4.0
Pour référence, voir: https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrame
je sais que vous avez déjà accepté l'autre réponse, mais si vous voulez faire cela comme un DataFrame, utilisez groupBy et agg. En supposant que vous avez déjà créé un DF (avec des colonnes nommées "col1", "col2", etc) Vous pouvez faire:
myDF.groupBy($"col1", $"col3", $"col4").agg($"col1", max($"col2"), $"col3", $"col4")
notez que dans ce cas, j'ai choisi le Max de col2, mais vous pouvez faire avg, min, etc.
j'ai utilisé la fonction intégrée dropDuplicates(). Code Scala indiqué ci-dessous
val data = sc.parallelize(List(("Foo",41,"US",3),
("Foo",39,"UK",1),
("Bar",57,"CA",2),
("Bar",72,"CA",2),
("Baz",22,"US",6),
("Baz",36,"US",6))).toDF("x","y","z","count")
data.dropDuplicates(Array("x","count")).show()
Sortie :
+---+---+---+-----+
| x| y| z|count|
+---+---+---+-----+
|Baz| 22| US| 6|
|Foo| 39| UK| 1|
|Foo| 41| US| 3|
|Bar| 57| CA| 2|
+---+---+---+-----+
c'est mon DF contain 4 est répété deux fois donc ici va supprimer les valeurs répétées.
scala> df.show
+-----+
|value|
+-----+
| 1|
| 4|
| 3|
| 5|
| 4|
| 18|
+-----+
scala> val newdf=df.dropDuplicates
scala> newdf.show
+-----+
|value|
+-----+
| 1|
| 3|
| 5|
| 4|
| 18|
+-----+
le programme ci-dessous vous aidera à supprimer les doublons en entier , ou si vous voulez supprimer les doublons basés sur certaines colonnes , vous pouvez même le faire:
import org.apache.spark.sql.SparkSession
object DropDuplicates {
def main(args: Array[String]) {
val spark =
SparkSession.builder()
.appName("DataFrame-DropDuplicates")
.master("local[4]")
.getOrCreate()
import spark.implicits._
// create an RDD of tuples with some data
val custs = Seq(
(1, "Widget Co", 120000.00, 0.00, "AZ"),
(2, "Acme Widgets", 410500.00, 500.00, "CA"),
(3, "Widgetry", 410500.00, 200.00, "CA"),
(4, "Widgets R Us", 410500.00, 0.0, "CA"),
(3, "Widgetry", 410500.00, 200.00, "CA"),
(5, "Ye Olde Widgete", 500.00, 0.0, "MA"),
(6, "Widget Co", 12000.00, 10.00, "AZ")
)
val customerRows = spark.sparkContext.parallelize(custs, 4)
// convert RDD of tuples to DataFrame by supplying column names
val customerDF = customerRows.toDF("id", "name", "sales", "discount", "state")
println("*** Here's the whole DataFrame with duplicates")
customerDF.printSchema()
customerDF.show()
// drop fully identical rows
val withoutDuplicates = customerDF.dropDuplicates()
println("*** Now without duplicates")
withoutDuplicates.show()
// drop fully identical rows
val withoutPartials = customerDF.dropDuplicates(Seq("name", "state"))
println("*** Now without partial duplicates too")
withoutPartials.show()
}
}