SPARK SQL remplace mysql Group CONCAT aggregate function

j'ai une table de deux colonnes de type chaîne de caractères (Nom d'utilisateur, ami) et pour chaque nom d'utilisateur, je veux rassembler tous ses amis sur une rangée, concaténés en chaînes de caractères ('username1', 'friends1, friends2, friends3'). Je sais que MySql fait ça par GROUP_CONCAT, y a-t-il un moyen de faire ça avec SPARK SQL?

Merci

25
demandé sur zero323 2015-07-26 21:55:36

5 réponses

avant de procéder: cette opération est encore une autre groupByKey . Bien qu'il ait de multiples applications légitimes, il est relativement coûteux, alors assurez-vous de l'utiliser seulement lorsque cela est nécessaire.


pas exactement solution concise ou efficace, mais vous pouvez utiliser UserDefinedAggregateFunction introduit dans Spark 1.5.0:

object GroupConcat extends UserDefinedAggregateFunction {
    def inputSchema = new StructType().add("x", StringType)
    def bufferSchema = new StructType().add("buff", ArrayType(StringType))
    def dataType = StringType
    def deterministic = true 

    def initialize(buffer: MutableAggregationBuffer) = {
      buffer.update(0, ArrayBuffer.empty[String])
    }

    def update(buffer: MutableAggregationBuffer, input: Row) = {
      if (!input.isNullAt(0)) 
        buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0))
    }

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
      buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0))
    }

    def evaluate(buffer: Row) = UTF8String.fromString(
      buffer.getSeq[String](0).mkString(","))
}

exemple d'usage:

val df = sc.parallelize(Seq(
  ("username1", "friend1"),
  ("username1", "friend2"),
  ("username2", "friend1"),
  ("username2", "friend3")
)).toDF("username", "friend")

df.groupBy($"username").agg(GroupConcat($"friend")).show

## +---------+---------------+
## | username|        friends|
## +---------+---------------+
## |username1|friend1,friend2|
## |username2|friend1,friend3|
## +---------+---------------+

vous pouvez aussi créer un empaquetage Python comme montré dans Spark: comment cartographier Python avec des fonctions définies par L'utilisateur Scala ou Java?

dans la pratique, il peut être plus rapide d'extraire RDD, groupByKey , mkString et de reconstruire DataFrame.

vous pouvez obtenir un effet similaire en combinant la fonction collect_list (étincelle >= 1.6.0) avec concat_ws :

import org.apache.spark.sql.functions.{collect_list, udf, lit}

df.groupBy($"username")
  .agg(concat_ws(",", collect_list($"friend")).alias("friends"))
35
répondu zero323 2017-10-02 06:19:53

vous pouvez essayer la fonction collect_list

sqlContext.sql("select A, collect_list(B), collect_list(C) from Table1 group by A

ou vous pouvez régier un UDF quelque chose comme

sqlContext.udf.register("myzip",(a:Long,b:Long)=>(a+","+b))

et vous pouvez utiliser cette fonction dans la requête

sqlConttext.sql("select A,collect_list(myzip(B,C)) from tbl group by A")
13
répondu iec2011007 2016-02-16 09:24:01

Une façon de le faire avec pyspark < 1.6, qui malheureusement ne prend pas en charge définis par l'utilisateur fonction d'agrégation:

byUsername = df.rdd.reduceByKey(lambda x, y: x + ", " + y)

et si vous voulez en faire une nouvelle base de données:

sqlContext.createDataFrame(byUsername, ["username", "friends"])

à partir de 1.6, Vous pouvez utiliser collect_list et ensuite rejoindre la liste créée:

from pyspark.sql import functions as F
from pyspark.sql.types import StringType
join_ = F.udf(lambda x: ", ".join(x), StringType())
df.groupBy("username").agg(join_(F.collect_list("friend").alias("friends"))
2
répondu ksindi 2016-01-25 00:07:13

Voici une fonction que vous pouvez utiliser dans PySpark:

import pyspark.sql.functions as F

def group_concat(col, distinct=False, sep=','):
    if distinct:
        collect = F.collect_set(col.cast(StringType()))
    else:
        collect = F.collect_list(col.cast(StringType()))
    return F.concat_ws(sep, collect)


table.groupby('username').agg(F.group_concat('friends').alias('friends'))

en SQL:

select username, concat_ws(',', collect_list(friends)) as friends
from table
group by username
2
répondu rikturr 2018-04-06 17:55:34

Langue : Scala version Spark : 1.5.2

j'ai eu le même problème et j'ai aussi essayé de le résoudre en utilisant udfs mais, malheureusement, cela a conduit à plus de problèmes plus tard dans le code en raison d'incohérences de type. J'ai pu contourner cela en convertissant d'abord le DF en un RDD puis groupant par et en manipulant les données de la manière désirée et puis conversion du RDD en DF comme suit:

val df = sc
     .parallelize(Seq(
        ("username1", "friend1"),
        ("username1", "friend2"),
        ("username2", "friend1"),
        ("username2", "friend3")))
     .toDF("username", "friend")

+---------+-------+
| username| friend|
+---------+-------+
|username1|friend1|
|username1|friend2|
|username2|friend1|
|username2|friend3|
+---------+-------+

val dfGRPD = df.map(Row => (Row(0), Row(1)))
     .groupByKey()
     .map{ case(username:String, groupOfFriends:Iterable[String]) => (username, groupOfFriends.mkString(","))}
     .toDF("username", "groupOfFriends")

+---------+---------------+
| username| groupOfFriends|
+---------+---------------+
|username1|friend2,friend1|
|username2|friend3,friend1|
+---------+---------------+
1
répondu agent_C.Hdj 2016-12-15 10:59:59