PySpark: comment convertir Un tableau (i.e. liste) colonne en vecteur
version courte de la question!
Considérons le fragment de code suivant (en supposant que spark
est déjà fixé à un certain SparkSession
):
from pyspark.sql import Row
source_data = [
Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
Row(city="New York", temperatures=[-7.0, -7.0, -5.0]),
]
df = spark.createDataFrame(source_data)
notez que le champ températures est une liste de flotteurs. Je voudrais convertir ces listes de flotteurs au type MLlib Vector
, et j'aimerais que cette conversion à être exprimé à l'aide de la base DataFrame
API plutôt que de passer par RDDs (ce qui est inefficace car il envoie toutes les données de la JVM à Python, le traitement est fait en Python, nous n'avons pas les avantages de l'Étincelle du Catalyseur de l'optimiseur, yada yada). Comment dois-je faire? Plus précisément:
- Est-il possible d'obtenir une fonte de travail? Voir ci-dessous pour plus de détails (et une tentative ratée à une solution de contournement)? Ou, est-il une autre opération qui a pour effet que je recherchais?
- Quelle est la solution la plus efficace parmi les deux solutions de rechange que je propose ci-dessous (UDF vs exploser/réassembler les éléments de la liste)? Ou existe-il d'autres des alternatives presque bonnes mais pas tout à fait bonnes qui sont meilleures que l'une ou l'autre?
droite cast ne fonctionne pas
C'est ce que je m'attends à être la "bonne" solution. Je veux convertir le type d'une colonne d'un type à l'autre, de sorte que je devrais utiliser un cast. Un peu de contexte, permettez-moi de vous rappeler la façon normale de la convertir en un autre type:
from pyspark.sql import types
df_with_strings = df.select(
df["city"],
df["temperatures"].cast(types.ArrayType(types.StringType()))),
)
ex:df_with_strings.collect()[0]["temperatures"][1]
'-7.0'
. Mais si je lance vers un vecteur ml alors les choses ne vont pas si bien:
from pyspark.ml.linalg import VectorUDT
df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))
Ce qui donne une erreur:
pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast ArrayType(DoubleType,true) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#1 as vector), None)]
+- LogicalRDD [city#0, temperatures#1]
"
Beurk! Toutes les idées de comment résoudre ce problème?
alternatives possibles
Solution 1: Utilisation De VectorAssembler
il y a un Transformer
qui semble idéal pour ce poste:VectorAssembler
. Il prend une ou plusieurs colonnes et les concaténate en un seul vecteur. Malheureusement, il suffit de Vector
et Float
colonnes, pas Array
colonnes, de sorte que le suivre ne fonctionne pas:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["temperatures"], outputCol="temperature_vector")
df_fail = assembler.transform(df)
Cela donne cette erreur:
pyspark.sql.utils.IllegalArgumentException: 'Data type ArrayType(DoubleType,true) is not supported.'
le meilleur travail que je puisse faire est d'exploser la liste en plusieurs colonnes et ensuite d'utiliser le VectorAssembler
pour recueillir tous les à nouveau de retour:
from pyspark.ml.feature import VectorAssembler
TEMPERATURE_COUNT = 3
assembler_exploded = VectorAssembler(
inputCols=["temperatures[{}]".format(i) for i in range(TEMPERATURE_COUNT)],
outputCol="temperature_vector"
)
df_exploded = df.select(
df["city"],
*[df["temperatures"][i] for i in range(TEMPERATURE_COUNT)]
)
converted_df = assembler_exploded.transform(df_exploded)
final_df = converted_df.select("city", "temperature_vector")
Il me semble qu'il serait l'idéal, sauf que TEMPERATURE_COUNT
plus de 100, et parfois plus de 1000. (Un autre problème est que le code serait plus compliqué si vous ne connaissez pas la taille du tableau à l'avance, bien que ce soit pas le cas pour mes données.) Spark génère-t-il réellement un ensemble de données intermédiaires avec autant de colonnes, ou considère-t-il simplement qu'il s'agit d'une étape intermédiaire que les différents articles traversent de manière transitoire (ou optimise-t-il en effet cette étape d'éloignement entièrement lorsqu'il voit que la seule utilisation de ces colonnes doit être assemblé en un vecteur)?
solution 2: utiliser un fichier UDF
une alternative plus simple est d'utiliser un UDF pour effectuer la conversion. Cela me permet d'exprimer assez directement ce que je veux faire dans une ligne de code, et ne nécessite pas de faire un ensemble de données avec un nombre fou de colonnes. Mais toutes ces données doivent être échangées entre Python et la JVM, et chaque nombre individuel doit être manipulé par Python (ce qui est notoirement lent pour itérer des éléments de données individuels). Voici à quoi ça ressemble:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
df_with_vectors = df.select(
df["city"],
list_to_vector_udf(df["temperatures"]).alias("temperatures")
)
remarques ignorables
les sections restantes de cette question rambling sont quelques choses supplémentaires que j'ai inventé en essayant pour trouver une réponse. Ils peuvent probablement être ignorés par la plupart des gens lisant ceci.
pas de solution: utiliser Vector
pour commencer
dans cet exemple trivial, il est possible de créer les données en utilisant le type de vecteur pour commencer, mais bien sûr mes données ne sont pas vraiment une liste Python que je suis en parallélisation, mais sont plutôt lues à partir d'une source de données. Mais pour mémoire, voici à quoi cela ressemblerait:
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
source_data = [
Row(city="Chicago", temperatures=Vectors.dense([-1.0, -2.0, -3.0])),
Row(city="New York", temperatures=Vectors.dense([-7.0, -7.0, -5.0])),
]
df = spark.createDataFrame(source_data)
solution inefficace: utiliser map()
une possibilité est d'utiliser le RDD map()
méthode pour transformer la liste pour un Vector
. Cela est similaire à L'idée UDF, sauf que c'est encore pire parce que le coût de la sérialisation, etc. est encouru pour tous les champs dans chaque rangée, pas seulement celui qui est opéré. Pour mémoire, voici ce que la solution pourrait ressembler à ça:
df_with_vectors = df.rdd.map(lambda row: Row(
city=row["city"],
temperatures=Vectors.dense(row["temperatures"])
)).toDF()
Échec de la tentative de contournement de la fonte
En désespoir de cause, j'ai remarqué que Vector
est représenté à l'intérieur par une structure à quatre champs, mais utilisant un moulage traditionnel à partir de ce type de structure ne fonctionne pas non plus. Voici une illustration (où j'ai construit la structure en utilisant un udf mais l'udf n'est pas la partie importante):
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_almost_vector_udf = udf(lambda l: (1, None, None, l), VectorUDT.sqlType())
df_almost_vector = df.select(
df["city"],
list_to_almost_vector_udf(df["temperatures"]).alias("temperatures")
)
df_with_vectors = df_almost_vector.select(
df_almost_vector["city"],
df_almost_vector["temperatures"].cast(VectorUDT())
)
Ce qui donne l'erreur:
pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast StructType(StructField(type,ByteType,false), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,false),true), StructField(values,ArrayType(DoubleType,false),true)) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#5 as vector), None)]
+- Project [city#0, <lambda>(temperatures#1) AS temperatures#5]
+- LogicalRDD [city#0, temperatures#1]
"
2 réponses
Personnellement, je aller avec Python UDF et ne vous embêtez pas avec autre chose:
Vectors
ne sont pas des types SQL natifs donc il y aura des performances au-dessus d'une manière ou d'une autre. En particulier, ce processus nécessite deux étapes où les données sont d'abord conversion de type externe en ligne et à partir de la ligne de la représentation interne de l'utilisation de génériqueRowEncoder
.- tout ML en aval
Pipeline
sera beaucoup plus cher qu'un de la simple conversion. En outre, il exige un processus inverse à celui décrit ci-dessus
Mais si vous voulez vraiment d'autres options vous êtes ici:
Scala UDF with Python wrapper:
Installer sbt suivre les instructions sur le site du projet.
Créer Scala paquet avec la structure suivante:
. ├── build.sbt └── udfs.scala
Modifier
build.sbt
(ajuster pour refléter Scala et Spark la version):scalaVersion := "2.11.8" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-sql" % "2.1.0", "org.apache.spark" %% "spark-mllib" % "2.1.0" )
Modifier
udfs.scala
:package com.example.spark.udfs import org.apache.spark.sql.functions.udf import org.apache.spark.ml.linalg.DenseVector object udfs { val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray)) }
Package:
sbt package
et le comprennent (ou équivalent selon Scala vers:
$PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jar
comme argument pour
--driver-class-path
lors du démarrage de shell / lors de la soumission de la demande.Dans PySpark définir un wrapper:
from pyspark.sql.column import _to_java_column, _to_seq, Column from pyspark import SparkContext def as_vector(col): sc = SparkContext.getOrCreate() f = sc._jvm.com.example.spark.udfs.udfs.as_vector() return Column(f.apply(_to_seq(sc, [col], _to_java_column)))
Test:
with_vec = df.withColumn("vector", as_vector("temperatures")) with_vec.show()
+--------+------------------+----------------+ | city| temperatures| vector| +--------+------------------+----------------+ | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]| |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]| +--------+------------------+----------------+ with_vec.printSchema()
root |-- city: string (nullable = true) |-- temperatures: array (nullable = true) | |-- element: double (containsNull = true) |-- vector: vector (nullable = true)
Dump de données à un format JSON reflétant
DenseVector
schéma et lecture il en arrière:from pyspark.sql.functions import to_json, from_json, col, struct, lit from pyspark.sql.types import StructType, StructField from pyspark.ml.linalg import VectorUDT json_vec = to_json(struct(struct( lit(1).alias("type"), # type 1 is dense, type 0 is sparse col("temperatures").alias("values") ).alias("v"))) schema = StructType([StructField("v", VectorUDT())]) with_parsed_vector = df.withColumn( "parsed_vector", from_json(json_vec, schema).getItem("v") ) with_parsed_vector.show()
+--------+------------------+----------------+ | city| temperatures| parsed_vector| +--------+------------------+----------------+ | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]| |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]| +--------+------------------+----------------+
with_parsed_vector.printSchema()
root |-- city: string (nullable = true) |-- temperatures: array (nullable = true) | |-- element: double (containsNull = true) |-- parsed_vector: vector (nullable = true)
j'ai eu le même problème que toi et j'ai fait ça. Cette façon inclut la transformation de la DRD, donc le rendement n'est pas critique, mais cela fonctionne.
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
source_data = [
Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
Row(city="New York", temperatures=[-7.0, -7.0, -5.0]),
]
df = spark.createDataFrame(source_data)
city_rdd = df.rdd.map(lambda row:row[0])
temp_rdd = df.rdd.map(lambda row:row[1])
new_df = city_rdd.zip(temp_rdd.map(lambda x:Vectors.dense(x))).toDF(schema=['city','temperatures'])
new_df
le résultat est le suivant:
DataFrame[city: string, temperatures: vector]