SPARK SQL remplace mysql Group CONCAT aggregate function
j'ai une table de deux colonnes de type chaîne de caractères (Nom d'utilisateur, ami) et pour chaque nom d'utilisateur, je veux rassembler tous ses amis sur une rangée, concaténés en chaînes de caractères ('username1', 'friends1, friends2, friends3'). Je sais que MySql fait ça par GROUP_CONCAT, y a-t-il un moyen de faire ça avec SPARK SQL?
Merci
5 réponses
avant de procéder: cette opération est encore une autre groupByKey
. Bien qu'il ait de multiples applications légitimes, il est relativement coûteux, alors assurez-vous de l'utiliser seulement lorsque cela est nécessaire.
pas exactement solution concise ou efficace, mais vous pouvez utiliser UserDefinedAggregateFunction
introduit dans Spark 1.5.0:
object GroupConcat extends UserDefinedAggregateFunction {
def inputSchema = new StructType().add("x", StringType)
def bufferSchema = new StructType().add("buff", ArrayType(StringType))
def dataType = StringType
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) = {
buffer.update(0, ArrayBuffer.empty[String])
}
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0))
buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0))
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0))
}
def evaluate(buffer: Row) = UTF8String.fromString(
buffer.getSeq[String](0).mkString(","))
}
exemple d'usage:
val df = sc.parallelize(Seq(
("username1", "friend1"),
("username1", "friend2"),
("username2", "friend1"),
("username2", "friend3")
)).toDF("username", "friend")
df.groupBy($"username").agg(GroupConcat($"friend")).show
## +---------+---------------+
## | username| friends|
## +---------+---------------+
## |username1|friend1,friend2|
## |username2|friend1,friend3|
## +---------+---------------+
vous pouvez aussi créer un empaquetage Python comme montré dans Spark: comment cartographier Python avec des fonctions définies par L'utilisateur Scala ou Java?
dans la pratique, il peut être plus rapide d'extraire RDD, groupByKey
, mkString
et de reconstruire DataFrame.
vous pouvez obtenir un effet similaire en combinant la fonction collect_list
(étincelle >= 1.6.0) avec concat_ws
:
import org.apache.spark.sql.functions.{collect_list, udf, lit}
df.groupBy($"username")
.agg(concat_ws(",", collect_list($"friend")).alias("friends"))
vous pouvez essayer la fonction collect_list
sqlContext.sql("select A, collect_list(B), collect_list(C) from Table1 group by A
ou vous pouvez régier un UDF quelque chose comme
sqlContext.udf.register("myzip",(a:Long,b:Long)=>(a+","+b))
et vous pouvez utiliser cette fonction dans la requête
sqlConttext.sql("select A,collect_list(myzip(B,C)) from tbl group by A")
Une façon de le faire avec pyspark < 1.6, qui malheureusement ne prend pas en charge définis par l'utilisateur fonction d'agrégation:
byUsername = df.rdd.reduceByKey(lambda x, y: x + ", " + y)
et si vous voulez en faire une nouvelle base de données:
sqlContext.createDataFrame(byUsername, ["username", "friends"])
à partir de 1.6, Vous pouvez utiliser collect_list et ensuite rejoindre la liste créée:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
join_ = F.udf(lambda x: ", ".join(x), StringType())
df.groupBy("username").agg(join_(F.collect_list("friend").alias("friends"))
Voici une fonction que vous pouvez utiliser dans PySpark:
import pyspark.sql.functions as F
def group_concat(col, distinct=False, sep=','):
if distinct:
collect = F.collect_set(col.cast(StringType()))
else:
collect = F.collect_list(col.cast(StringType()))
return F.concat_ws(sep, collect)
table.groupby('username').agg(F.group_concat('friends').alias('friends'))
en SQL:
select username, concat_ws(',', collect_list(friends)) as friends
from table
group by username
Langue : Scala version Spark : 1.5.2
j'ai eu le même problème et j'ai aussi essayé de le résoudre en utilisant udfs
mais, malheureusement, cela a conduit à plus de problèmes plus tard dans le code en raison d'incohérences de type. J'ai pu contourner cela en convertissant d'abord le DF
en un RDD
puis groupant par et en manipulant les données de la manière désirée et puis conversion du RDD
en DF
comme suit:
val df = sc
.parallelize(Seq(
("username1", "friend1"),
("username1", "friend2"),
("username2", "friend1"),
("username2", "friend3")))
.toDF("username", "friend")
+---------+-------+
| username| friend|
+---------+-------+
|username1|friend1|
|username1|friend2|
|username2|friend1|
|username2|friend3|
+---------+-------+
val dfGRPD = df.map(Row => (Row(0), Row(1)))
.groupByKey()
.map{ case(username:String, groupOfFriends:Iterable[String]) => (username, groupOfFriends.mkString(","))}
.toDF("username", "groupOfFriends")
+---------+---------------+
| username| groupOfFriends|
+---------+---------------+
|username1|friend2,friend1|
|username2|friend3,friend1|
+---------+---------------+