Supprimer les doublons des lignes basées sur des colonnes spécifiques dans une base de données RDD/Spark

disons que j'ai un assez vaste ensemble de données sous la forme suivante:

data = sc.parallelize([('Foo',41,'US',3),
                       ('Foo',39,'UK',1),
                       ('Bar',57,'CA',2),
                       ('Bar',72,'CA',2),
                       ('Baz',22,'US',6),
                       ('Baz',36,'US',6)])

ce que je voudrais faire est de supprimer les lignes dupliquées basées sur les valeurs des première,troisième et quatrième colonnes seulement.

supprimer entièrement les lignes dupliquées est simple:

data = data.distinct()

et la ligne 5 ou ligne 6 sera supprimé

mais comment puis-je supprimer seulement les lignes dupliquées basées sur les colonnes 1, 3 et 4 seulement? c'est à dire supprimer soit un des les présentes:

('Baz',22,'US',6)
('Baz',36,'US',6)

en Python, cela peut être fait en spécifiant des colonnes avec .drop_duplicates(). Comment puis-je obtenir la même chose dans Spark/Pyspark?

27
demandé sur Jason 2015-05-15 01:03:24

7 réponses

Pyspark inclure un dropDuplicates() méthode. https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates

>>> from pyspark.sql import Row
>>> df = sc.parallelize([ \
...     Row(name='Alice', age=5, height=80), \
...     Row(name='Alice', age=5, height=80), \
...     Row(name='Alice', age=10, height=80)]).toDF()
>>> df.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
| 10|    80|Alice|
+---+------+-----+

>>> df.dropDuplicates(['name', 'height']).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
+---+------+-----+

peut-être qu'il a été introduit dans une version plus tardive que celle utilisée par @Jason (OP)?

edit: ouais, il a été introduit dans la version 1.4

30
répondu vaer-k 2016-10-05 18:21:57

D'après votre question, il n'est pas clair à quelles colonnes vous voulez utiliser pour déterminer les doublons. L'idée générale derrière la solution est de créer une clé basée sur les valeurs des colonnes qui permettent d'identifier les doublons. Ensuite, vous pouvez utiliser la reduceByKey ou réduire les opérations pour éliminer les doublons.

Voici le code:

def get_key(x):
    return "{0}{1}{2}".format(x[0],x[2],x[3])

m = data.map(lambda x: (get_key(x),x))

Maintenant, vous avez une clé-valeur RDD qui est saisi par les colonnes 1, 3 et 4. La prochaine étape serait un reduceByKey ou groupByKey et filter. Cela éliminerait les doublons.

r = m.reduceByKey(lambda x,y: (x))
19
répondu Mike 2016-04-18 12:40:06

D'accord avec David. Pour ajouter, il pas être le cas que nous voulons groupe toutes les colonnes autres que celles de la fonction agrégée I. e, si nous voulons supprimer les doublons purement basés sur un sous-ensemble de colonnes et conserver toutes les colonnes dans le datagramme original. Donc la meilleure façon de le faire pourrait être d'utiliser dropDuplicates api Dataframe disponible dans Spark 1.4.0

Pour référence, voir: https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrame

10
répondu technotring 2015-09-10 13:04:06

je sais que vous avez déjà accepté l'autre réponse, mais si vous voulez faire cela comme un DataFrame, utilisez groupBy et agg. En supposant que vous avez déjà créé un DF (avec des colonnes nommées "col1", "col2", etc) Vous pouvez faire:

myDF.groupBy($"col1", $"col3", $"col4").agg($"col1", max($"col2"), $"col3", $"col4")

notez que dans ce cas, j'ai choisi le Max de col2, mais vous pouvez faire avg, min, etc.

9
répondu David Griffin 2015-05-15 10:54:27

j'ai utilisé la fonction intégrée dropDuplicates(). Code Scala indiqué ci-dessous

val data = sc.parallelize(List(("Foo",41,"US",3),
("Foo",39,"UK",1),
("Bar",57,"CA",2),
("Bar",72,"CA",2),
("Baz",22,"US",6),
("Baz",36,"US",6))).toDF("x","y","z","count")

data.dropDuplicates(Array("x","count")).show()

Sortie :

+---+---+---+-----+
|  x|  y|  z|count|
+---+---+---+-----+
|Baz| 22| US|    6|
|Foo| 39| UK|    1|
|Foo| 41| US|    3|
|Bar| 57| CA|    2|
+---+---+---+-----+
7
répondu Aravind Krishnakumar 2016-11-18 03:11:29

c'est mon DF contain 4 est répété deux fois donc ici va supprimer les valeurs répétées.

scala> df.show
+-----+
|value|
+-----+
|    1|
|    4|
|    3|
|    5|
|    4|
|   18|
+-----+

scala> val newdf=df.dropDuplicates

scala> newdf.show
+-----+
|value|
+-----+
|    1|
|    3|
|    5|
|    4|
|   18|
+-----+
0
répondu Nilesh Shinde 2017-11-10 07:30:38

le programme ci-dessous vous aidera à supprimer les doublons en entier , ou si vous voulez supprimer les doublons basés sur certaines colonnes , vous pouvez même le faire:

import org.apache.spark.sql.SparkSession

object DropDuplicates {
def main(args: Array[String]) {
val spark =
  SparkSession.builder()
    .appName("DataFrame-DropDuplicates")
    .master("local[4]")
    .getOrCreate()

import spark.implicits._

// create an RDD of tuples with some data
val custs = Seq(
  (1, "Widget Co", 120000.00, 0.00, "AZ"),
  (2, "Acme Widgets", 410500.00, 500.00, "CA"),
  (3, "Widgetry", 410500.00, 200.00, "CA"),
  (4, "Widgets R Us", 410500.00, 0.0, "CA"),
  (3, "Widgetry", 410500.00, 200.00, "CA"),
  (5, "Ye Olde Widgete", 500.00, 0.0, "MA"),
  (6, "Widget Co", 12000.00, 10.00, "AZ")
)
val customerRows = spark.sparkContext.parallelize(custs, 4)

// convert RDD of tuples to DataFrame by supplying column names
val customerDF = customerRows.toDF("id", "name", "sales", "discount", "state")

println("*** Here's the whole DataFrame with duplicates")

customerDF.printSchema()

customerDF.show()

// drop fully identical rows
val withoutDuplicates = customerDF.dropDuplicates()

println("*** Now without duplicates")

withoutDuplicates.show()

// drop fully identical rows
val withoutPartials = customerDF.dropDuplicates(Seq("name", "state"))

println("*** Now without partial duplicates too")

withoutPartials.show()

 }
 }
0
répondu Sampat Kumar 2018-05-21 14:37:24