Spark: comment mapper Python avec des fonctions définies par L'utilisateur Scala ou Java?

disons par exemple que mon équipe a choisi Python comme langage de référence à développer avec Spark. Mais plus tard, pour des raisons de performance, nous aimerions développer des librairies spécifiques Scala ou Java afin de les mapper avec notre code Python (quelque chose de similaire à des bouts de Python avec des squelettes Scala ou Java).

ne pensez-vous pas qu'il est possible d'interfacer de nouvelles méthodes Python personnalisées avec sous le capot quelques fonctions définies par L'utilisateur Scala ou Java ?

16
demandé sur prossblad 2015-10-20 13:06:08

1 réponses

étincelle 2.1+

vous pouvez utiliser SQLContext.registerJavaFunction :

enregistre un UDF java pour qu'il puisse être utilisé dans les instructions SQL.

qui nécessite un name , nom complet de la classe Java, et type de retour optionnel. Malheureusement pour le moment, il ne peut être utilisé que dans les instructions SQL (ou avec expr / selectExpr ) et nécessite une org.apache.spark.sql.api.java.UDF* :

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-sql" % "2.1.0"
)
package com.example.spark.udfs

import org.apache.spark.sql.api.java.UDF1

class addOne extends UDF1[Integer, Integer] {
  def call(x: Integer) = x + 1
} 
sqlContext.registerJavaFunction("add_one", "com.example.spark.udfs.addOne")
sqlContext.sql("SELECT add_one(1)").show()

## +------+
## |UDF(1)|
## +------+
## |     2|
## +------+

version indépendante :

je n'irais pas jusqu'à dire qu'il est pris en charge, mais il est certainement possible. Toutes les fonctions SQL disponibles actuellement dans PySpark sont simplement un wrappers autour de L'API Scala.

laisse supposer que je veux réutiliser GroupConcat UDAF que j'ai créé comme réponse à SPARK SQL remplacement pour mysql GROUP_CONCAT fonction agrégée et elle est située dans un paquet com.example.udaf :

from pyspark.sql.column import Column, _to_java_column, _to_seq
from pyspark.sql import Row

row = Row("k", "v")
df = sc.parallelize([
    row(1, "foo1"), row(1, "foo2"), row(2, "bar1"), row(2, "bar2")]).toDF()

def groupConcat(col):
    """Group and concatenate values for a given column

    >>> df = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
    >>> df.select(groupConcat("v").alias("vs"))
    [Row(vs=u'foo,bar')]
    """
    sc = SparkContext._active_spark_context
    # It is possible to use java_import to avoid full package path
    _groupConcat = sc._jvm.com.example.udaf.GroupConcat.apply
    # Converting to Seq to match apply(exprs: Column*)
    return Column(_groupConcat(_to_seq(sc, [col], _to_java_column)))

df.groupBy("k").agg(groupConcat("v").alias("vs")).show()

## +---+---------+
## |  k|       vs|
## +---+---------+
## |  1|foo1,foo2|
## |  2|bar1,bar2|
## +---+---------+

Il y a beaucoup trop de leaders souligne à mon goût mais comme vous pouvez le voir, il peut être fait.

liée à:

22
répondu zero323 2017-09-01 10:37:33