PySpark: comment convertir Un tableau (i.e. liste) colonne en vecteur

version courte de la question!

Considérons le fragment de code suivant (en supposant que spark est déjà fixé à un certain SparkSession):

from pyspark.sql import Row
source_data = [
    Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
    Row(city="New York", temperatures=[-7.0, -7.0, -5.0]), 
]
df = spark.createDataFrame(source_data)

notez que le champ températures est une liste de flotteurs. Je voudrais convertir ces listes de flotteurs au type MLlib Vector, et j'aimerais que cette conversion à être exprimé à l'aide de la base DataFrame API plutôt que de passer par RDDs (ce qui est inefficace car il envoie toutes les données de la JVM à Python, le traitement est fait en Python, nous n'avons pas les avantages de l'Étincelle du Catalyseur de l'optimiseur, yada yada). Comment dois-je faire? Plus précisément:

  1. Est-il possible d'obtenir une fonte de travail? Voir ci-dessous pour plus de détails (et une tentative ratée à une solution de contournement)? Ou, est-il une autre opération qui a pour effet que je recherchais?
  2. Quelle est la solution la plus efficace parmi les deux solutions de rechange que je propose ci-dessous (UDF vs exploser/réassembler les éléments de la liste)? Ou existe-il d'autres des alternatives presque bonnes mais pas tout à fait bonnes qui sont meilleures que l'une ou l'autre?

droite cast ne fonctionne pas

C'est ce que je m'attends à être la "bonne" solution. Je veux convertir le type d'une colonne d'un type à l'autre, de sorte que je devrais utiliser un cast. Un peu de contexte, permettez-moi de vous rappeler la façon normale de la convertir en un autre type:

from pyspark.sql import types
df_with_strings = df.select(
    df["city"], 
    df["temperatures"].cast(types.ArrayType(types.StringType()))),
)

ex:df_with_strings.collect()[0]["temperatures"][1]'-7.0'. Mais si je lance vers un vecteur ml alors les choses ne vont pas si bien:

from pyspark.ml.linalg import VectorUDT
df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))

Ce qui donne une erreur:

pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast ArrayType(DoubleType,true) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#1 as vector), None)]
+- LogicalRDD [city#0, temperatures#1]
"

Beurk! Toutes les idées de comment résoudre ce problème?

alternatives possibles

Solution 1: Utilisation De VectorAssembler

il y a un Transformer qui semble idéal pour ce poste:VectorAssembler. Il prend une ou plusieurs colonnes et les concaténate en un seul vecteur. Malheureusement, il suffit de Vector et Float colonnes, pas Array colonnes, de sorte que le suivre ne fonctionne pas:

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["temperatures"], outputCol="temperature_vector")
df_fail = assembler.transform(df)

Cela donne cette erreur:

pyspark.sql.utils.IllegalArgumentException: 'Data type ArrayType(DoubleType,true) is not supported.'

le meilleur travail que je puisse faire est d'exploser la liste en plusieurs colonnes et ensuite d'utiliser le VectorAssembler pour recueillir tous les à nouveau de retour:

from pyspark.ml.feature import VectorAssembler
TEMPERATURE_COUNT = 3
assembler_exploded = VectorAssembler(
    inputCols=["temperatures[{}]".format(i) for i in range(TEMPERATURE_COUNT)], 
    outputCol="temperature_vector"
)
df_exploded = df.select(
    df["city"], 
    *[df["temperatures"][i] for i in range(TEMPERATURE_COUNT)]
)
converted_df = assembler_exploded.transform(df_exploded)
final_df = converted_df.select("city", "temperature_vector")

Il me semble qu'il serait l'idéal, sauf que TEMPERATURE_COUNT plus de 100, et parfois plus de 1000. (Un autre problème est que le code serait plus compliqué si vous ne connaissez pas la taille du tableau à l'avance, bien que ce soit pas le cas pour mes données.) Spark génère-t-il réellement un ensemble de données intermédiaires avec autant de colonnes, ou considère-t-il simplement qu'il s'agit d'une étape intermédiaire que les différents articles traversent de manière transitoire (ou optimise-t-il en effet cette étape d'éloignement entièrement lorsqu'il voit que la seule utilisation de ces colonnes doit être assemblé en un vecteur)?

solution 2: utiliser un fichier UDF

une alternative plus simple est d'utiliser un UDF pour effectuer la conversion. Cela me permet d'exprimer assez directement ce que je veux faire dans une ligne de code, et ne nécessite pas de faire un ensemble de données avec un nombre fou de colonnes. Mais toutes ces données doivent être échangées entre Python et la JVM, et chaque nombre individuel doit être manipulé par Python (ce qui est notoirement lent pour itérer des éléments de données individuels). Voici à quoi ça ressemble:

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
df_with_vectors = df.select(
    df["city"], 
    list_to_vector_udf(df["temperatures"]).alias("temperatures")
)

remarques ignorables

les sections restantes de cette question rambling sont quelques choses supplémentaires que j'ai inventé en essayant pour trouver une réponse. Ils peuvent probablement être ignorés par la plupart des gens lisant ceci.

pas de solution: utiliser Vector pour commencer

dans cet exemple trivial, il est possible de créer les données en utilisant le type de vecteur pour commencer, mais bien sûr mes données ne sont pas vraiment une liste Python que je suis en parallélisation, mais sont plutôt lues à partir d'une source de données. Mais pour mémoire, voici à quoi cela ressemblerait:

from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
source_data = [
    Row(city="Chicago", temperatures=Vectors.dense([-1.0, -2.0, -3.0])),
    Row(city="New York", temperatures=Vectors.dense([-7.0, -7.0, -5.0])),
]
df = spark.createDataFrame(source_data)

solution inefficace: utiliser map()

une possibilité est d'utiliser le RDD map() méthode pour transformer la liste pour un Vector. Cela est similaire à L'idée UDF, sauf que c'est encore pire parce que le coût de la sérialisation, etc. est encouru pour tous les champs dans chaque rangée, pas seulement celui qui est opéré. Pour mémoire, voici ce que la solution pourrait ressembler à ça:

df_with_vectors = df.rdd.map(lambda row: Row(
    city=row["city"], 
    temperatures=Vectors.dense(row["temperatures"])
)).toDF()

Échec de la tentative de contournement de la fonte

En désespoir de cause, j'ai remarqué que Vector est représenté à l'intérieur par une structure à quatre champs, mais utilisant un moulage traditionnel à partir de ce type de structure ne fonctionne pas non plus. Voici une illustration (où j'ai construit la structure en utilisant un udf mais l'udf n'est pas la partie importante):

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_almost_vector_udf = udf(lambda l: (1, None, None, l), VectorUDT.sqlType())
df_almost_vector = df.select(
    df["city"], 
    list_to_almost_vector_udf(df["temperatures"]).alias("temperatures")
)
df_with_vectors = df_almost_vector.select(
    df_almost_vector["city"], 
    df_almost_vector["temperatures"].cast(VectorUDT())
)

Ce qui donne l'erreur:

pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast StructType(StructField(type,ByteType,false), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,false),true), StructField(values,ArrayType(DoubleType,false),true)) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#5 as vector), None)]
+- Project [city#0, <lambda>(temperatures#1) AS temperatures#5]
+- LogicalRDD [city#0, temperatures#1]
"
44
demandé sur Community 2017-02-09 16:49:50

2 réponses

Personnellement, je aller avec Python UDF et ne vous embêtez pas avec autre chose:

Mais si vous voulez vraiment d'autres options vous êtes ici:

  • Scala UDF with Python wrapper:

    Installer sbt suivre les instructions sur le site du projet.

    Créer Scala paquet avec la structure suivante:

    .
    ├── build.sbt
    └── udfs.scala
    

    Modifier build.sbt (ajuster pour refléter Scala et Spark la version):

    scalaVersion := "2.11.8"
    
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-sql" % "2.1.0",
      "org.apache.spark" %% "spark-mllib" % "2.1.0"
    )
    

    Modifier udfs.scala:

    package com.example.spark.udfs
    
    import org.apache.spark.sql.functions.udf
    import org.apache.spark.ml.linalg.DenseVector
    
    object udfs {
      val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray))
    }
    

    Package:

    sbt package
    

    et le comprennent (ou équivalent selon Scala vers:

    $PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jar
    

    comme argument pour --driver-class-path lors du démarrage de shell / lors de la soumission de la demande.

    Dans PySpark définir un wrapper:

    from pyspark.sql.column import _to_java_column, _to_seq, Column
    from pyspark import SparkContext
    
    def as_vector(col):
        sc = SparkContext.getOrCreate()
        f = sc._jvm.com.example.spark.udfs.udfs.as_vector()
        return Column(f.apply(_to_seq(sc, [col], _to_java_column)))
    

    Test:

    with_vec = df.withColumn("vector", as_vector("temperatures"))
    with_vec.show()
    
    +--------+------------------+----------------+
    |    city|      temperatures|          vector|
    +--------+------------------+----------------+
    | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
    |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
    +--------+------------------+----------------+
    
    with_vec.printSchema()
    
    root
     |-- city: string (nullable = true)
     |-- temperatures: array (nullable = true)
     |    |-- element: double (containsNull = true)
     |-- vector: vector (nullable = true)
    
  • Dump de données à un format JSON reflétant DenseVector schéma et lecture il en arrière:

    from pyspark.sql.functions import to_json, from_json, col, struct, lit
    from pyspark.sql.types import StructType, StructField
    from pyspark.ml.linalg import VectorUDT
    
    json_vec = to_json(struct(struct(
        lit(1).alias("type"),  # type 1 is dense, type 0 is sparse
        col("temperatures").alias("values")
    ).alias("v")))
    
    schema = StructType([StructField("v", VectorUDT())])
    
    with_parsed_vector = df.withColumn(
        "parsed_vector", from_json(json_vec, schema).getItem("v")
    )
    
    with_parsed_vector.show()
    
    +--------+------------------+----------------+
    |    city|      temperatures|   parsed_vector|
    +--------+------------------+----------------+
    | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
    |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
    +--------+------------------+----------------+
    
    with_parsed_vector.printSchema()
    
    root
     |-- city: string (nullable = true)
     |-- temperatures: array (nullable = true)
     |    |-- element: double (containsNull = true)
     |-- parsed_vector: vector (nullable = true)
    
14
répondu user6910411 2017-05-23 12:26:01

j'ai eu le même problème que toi et j'ai fait ça. Cette façon inclut la transformation de la DRD, donc le rendement n'est pas critique, mais cela fonctionne.

from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

source_data = [
    Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
    Row(city="New York", temperatures=[-7.0, -7.0, -5.0]), 
]
df = spark.createDataFrame(source_data)

city_rdd = df.rdd.map(lambda row:row[0])
temp_rdd = df.rdd.map(lambda row:row[1])
new_df = city_rdd.zip(temp_rdd.map(lambda x:Vectors.dense(x))).toDF(schema=['city','temperatures'])

new_df

le résultat est le suivant:

DataFrame[city: string, temperatures: vector]
3
répondu GGDammy 2018-01-19 04:43:20