Comment convertir un objet rdd en dataframe en spark
comment convertir un RDD ( org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
) en Dataframe org.apache.spark.sql.DataFrame
. J'ai converti une base de données en rdd en utilisant .rdd
. Après le traitement, je le veux dans dataframe. Comment puis-je faire cela ?
10 réponses
SqlContext
a un certain nombre de createDataFrame
méthodes qui créent un DataFrame
donné un RDD
. J'imagine que l'un d'eux conviendra à votre contexte.
par exemple:
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
crée une base de données à partir d'un RDD contenant des lignes en utilisant le schéma.
en supposant que votre RDD [ligne] s'appelle rdd, vous pouvez utiliser:
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
rdd.toDF()
ce code fonctionne parfaitement de Spark 2.x avec Scala 2.11
Importer les classes
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
"créer SparkSession
objet, ici c'est spark
val spark: SparkSession = SparkSession.builder.master("local").getOrCreate
val sc = spark.sparkContext // Just used to create test RDDs
disons un RDD
pour en faire un DataFrame
val rdd = sc.parallelize(
Seq(
("first", Array(2.0, 1.0, 2.1, 5.4)),
("test", Array(1.5, 0.5, 0.9, 3.7)),
("choose", Array(8.0, 2.9, 9.1, 2.5))
)
)
Méthode 1
par SparkSession.createDataFrame(RDD obj)
.
val dfWithoutSchema = spark.createDataFrame(rdd)
dfWithoutSchema.show()
+------+--------------------+
| _1| _2|
+------+--------------------+
| first|[2.0, 1.0, 2.1, 5.4]|
| test|[1.5, 0.5, 0.9, 3.7]|
|choose|[8.0, 2.9, 9.1, 2.5]|
+------+--------------------+
Méthode 2 1519300920"
en utilisant SparkSession.createDataFrame(RDD obj)
et en spécifiant le nom des colonnes.
val dfWithSchema = spark.createDataFrame(rdd).toDF("id", "vals")
dfWithSchema.show()
+------+--------------------+
| id| vals|
+------+--------------------+
| first|[2.0, 1.0, 2.1, 5.4]|
| test|[1.5, 0.5, 0.9, 3.7]|
|choose|[8.0, 2.9, 9.1, 2.5]|
+------+--------------------+
la Méthode 3 (Réelle réponse à la question)
de cette façon, l'entrée rdd
doit être du type RDD[Row]
.
val rowsRdd: RDD[Row] = sc.parallelize(
Seq(
Row("first", 2.0, 7.0),
Row("second", 3.5, 2.5),
Row("third", 7.0, 5.9)
)
)
créer le schéma
val schema = new StructType()
.add(StructField("id", StringType, true))
.add(StructField("val1", DoubleType, true))
.add(StructField("val2", DoubleType, true))
Appliquer maintenant les deux rowsRdd
et schema
à createDataFrame()
val df = spark.createDataFrame(rowsRdd, schema)
df.show()
+------+----+----+
| id|val1|val2|
+------+----+----+
| first| 2.0| 7.0|
|second| 3.5| 2.5|
| third| 7.0| 5.9|
+------+----+----+
supposons que vous ayez un DataFrame
et que vous voulez modifier les données des champs en les convertissant en RDD[Row]
.
val aRdd = aDF.map(x=>Row(x.getAs[Long]("id"),x.getAs[List[String]]("role").head))
pour revenir à DataFrame
de RDD
nous avons besoin de définir le type de structure du RDD
.
si le type de données était Long
alors il deviendra comme LongType
dans la structure.
si String
alors StringType
de structure.
val aStruct = new StructType(Array(StructField("id",LongType,nullable = true),StructField("role",StringType,nullable = true)))
maintenant, vous pouvez convertir le RDD en DataFrame en utilisant la méthode createDataFrame .
val aNamedDF = sqlContext.createDataFrame(aRdd,aStruct)
Note: Cette réponse a été initialement posté ici
je poste cette réponse parce que je voudrais partager des détails supplémentaires sur les options disponibles que je n'ai pas trouvé dans les autres réponses
pour créer une base de données à partir d'un RDD de lignes, il y a deux options principales:
1) comme déjà indiqué, vous pouvez utiliser toDF()
qui peut être importé par import sqlContext.implicits._
. Toutefois, cette approche ne fonctionne que pour les types de RDDs suivants:
-
RDD[Int]
-
RDD[Long]
-
RDD[String]
-
RDD[T <: scala.Product]
(source: Scaladoc de la SQLContext.implicits
de l'objet)
la dernière signature signifie en fait qu'elle peut fonctionner pour un RDD de tuples ou un RDD de classes de cas (parce que tuples et classes de cas sont des sous-classes de scala.Product
).
Alors, d'utiliser cette approche pour la RDD[Row]
, vous devez l'associer à un RDD[T <: scala.Product]
. Cela peut être fait en mappant chaque ligne à une classe de cas personnalisé ou à un tuple, comme dans les extraits de code suivants:
val df = rdd.map({
case Row(val1: String, ..., valN: Long) => (val1, ..., valN)
}).toDF("col1_name", ..., "colN_name")
ou
case class MyClass(val1: String, ..., valN: Long = 0L)
val df = rdd.map({
case Row(val1: String, ..., valN: Long) => MyClass(val1, ..., valN)
}).toDF("col1_name", ..., "colN_name")
Le principal inconvénient de cette approche (à mon avis) est que vous devez explicitement définir le schéma du datagramme résultant dans la fonction map, colonne par colonne. Peut-être que cela peut être fait programmatiquement si vous ne connaissez pas le schéma à l'avance, mais les choses peuvent devenir un peu désordonnée là. Donc, alternativement, il y a une autre option:
2) vous pouvez utiliser createDataFrame(rowRDD: RDD[Row], schema: StructType)
comme dans la réponse acceptée, qui est disponible dans le SQLContext object. Exemple pour convertir un RDD d'une ancienne DataFrame:
val rdd = oldDF.rdd
val newDF = oldDF.sqlContext.createDataFrame(rdd, oldDF.schema)
notez qu'il n'est pas nécessaire de définir explicitement une colonne de schéma. Nous réutilisons le vieux schéma DF, qui est de classe StructType
et peut être facilement étendu. Cependant, cette approche n'est parfois pas possible, et dans certains cas peut s'avérer moins efficace que le premier.
voici un exemple simple de conversion de votre liste en Spark RDD et de conversion de cette Spark RDD en Dataframe.
s'il vous plaît noter que j'ai utilisé la version scala de Spark-shell pour exécuter le code suivant, ici sc est une instance de SparkContext qui est implicitement disponible dans Spark-shell. Espérons qu'il répondra à vos questions.
scala> val numList = List(1,2,3,4,5)
numList: List[Int] = List(1, 2, 3, 4, 5)
scala> val numRDD = sc.parallelize(numList)
numRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[80] at parallelize at <console>:28
scala> val numDF = numRDD.toDF
numDF: org.apache.spark.sql.DataFrame = [_1: int]
scala> numDF.show
+---+
| _1|
+---+
| 1|
| 2|
| 3|
| 4|
| 5|
+---+
Méthode 1: (Scala)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val df_2 = sc.parallelize(Seq((1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c"))).toDF("x", "y", "z")
Méthode 2: (Scala)
case class temp(val1: String,val3 : Double)
val rdd = sc.parallelize(Seq(
Row("foo", 0.5), Row("bar", 0.0)
))
val rows = rdd.map({case Row(val1:String,val3:Double) => temp(val1,val3)}).toDF()
rows.show()
Méthode 1: (Python)
from pyspark.sql import Row
l = [('Alice',2)]
Person = Row('name','age')
rdd = sc.parallelize(l)
person = rdd.map(lambda r:Person(*r))
df2 = sqlContext.createDataFrame(person)
df2.show()
Méthode 2: (Python)
from pyspark.sql.types import *
l = [('Alice',2)]
rdd = sc.parallelize(l)
schema = StructType([StructField ("name" , StringType(), True) ,
StructField("age" , IntegerType(), True)])
df3 = sqlContext.createDataFrame(rdd, schema)
df3.show()
extrait la valeur de l'objet row et ensuite appliqué la classe de cas pour convertir rdd en DF
val temp1 = attrib1.map{case Row ( key: Int ) => s"$key" }
val temp2 = attrib2.map{case Row ( key: Int) => s"$key" }
case class RLT (id: String, attrib_1 : String, attrib_2 : String)
import hiveContext.implicits._
val df = result.map{ s => RLT(s(0),s(1),s(2)) }.toDF
sur les nouvelles versions de spark (2.0+). Cela fonctionnera même sans sqlcontext disponible.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val spark = SparkSession
.builder()
.getOrCreate()
import spark.implicits._
val dfSchema = Seq("col1", "col2", "col3")
rdd.toDF(dfSchema: _*)
One needs to create a schema, and attach it to the Rdd.
si val spark est le produit d'une étincelle.constructeur...
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
/* Lets gin up some sample data:
* As RDD's and dataframes can have columns of differing types, lets make our
* sample data a three wide, two tall, rectangle of mixed types.
* A column of Strings, a column of Longs, and a column of Doubules
*/
val arrayOfArrayOfAnys = Array.ofDim[Any](2,3)
arrayOfArrayOfAnys(0)(0)="aString"
arrayOfArrayOfAnys(0)(1)=0L
arrayOfArrayOfAnys(0)(2)=3.14159
arrayOfArrayOfAnys(1)(0)="bString"
arrayOfArrayOfAnys(1)(1)=9876543210L
arrayOfArrayOfAnys(1)(2)=2.71828
/* The way to convert an anything which looks rectangular,
* (Array[Array[String]] or Array[Array[Any]] or Array[Row], ... ) into an RDD is to
* throw it into sparkContext.parallelize.
* http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext shows
* the parallelize definition as
* def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)
* so in our case our ArrayOfArrayOfAnys is treated as a sequence of ArraysOfAnys.
* Will leave the numSlices as the defaultParallelism, as I have no particular cause to change it.
*/
val rddOfArrayOfArrayOfAnys=spark.sparkContext.parallelize(arrayOfArrayOfAnys)
/* We'll be using the sqlContext.createDataFrame to add a schema our RDD.
* The RDD which goes into createDataFrame is an RDD[Row] which is not what we happen to have.
* To convert anything one tall and several wide into a Row, one can use Row.fromSeq(thatThing.toSeq)
* As we have an RDD[somethingWeDontWant], we can map each of the RDD rows into the desired Row type.
*/
val rddOfRows=rddOfArrayOfArrayOfAnys.map(f=>
Row.fromSeq(f.toSeq)
)
/* Now to construct our schema. This needs to be a StructType of 1 StructField per column in our dataframe.
* https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.StructField shows the definition as
* case class StructField(name: String, dataType: DataType, nullable: Boolean = true, metadata: Metadata = Metadata.empty)
* Will leave the two default values in place for each of the columns:
* nullability as true,
* metadata as an empty Map[String,Any]
*
*/
val schema = StructType(
StructField("colOfStrings", StringType) ::
StructField("colOfLongs" , LongType ) ::
StructField("colOfDoubles", DoubleType) ::
Nil
)
val df=spark.sqlContext.createDataFrame(rddOfRows,schema)
/*
* +------------+----------+------------+
* |colOfStrings|colOfLongs|colOfDoubles|
* +------------+----------+------------+
* | aString| 0| 3.14159|
* | bString|9876543210| 2.71828|
* +------------+----------+------------+
*/
df.show
mêmes étapes, mais avec moins de déclarations val:
val arrayOfArrayOfAnys=Array(
Array("aString",0L ,3.14159),
Array("bString",9876543210L,2.71828)
)
val rddOfRows=spark.sparkContext.parallelize(arrayOfArrayOfAnys).map(f=>Row.fromSeq(f.toSeq))
/* If one knows the datatypes, for instance from JDBC queries as to RDBC column metadata:
* Consider constructing the schema from an Array[StructField]. This would allow looping over
* the columns, with a match statement applying the appropriate sql datatypes as the second
* StructField arguments.
*/
val sf=new Array[StructField](3)
sf(0)=StructField("colOfStrings",StringType)
sf(1)=StructField("colOfLongs" ,LongType )
sf(2)=StructField("colOfDoubles",DoubleType)
val df=spark.sqlContext.createDataFrame(rddOfRows,StructType(sf.toList))
df.show
pour convertir Un tableau[ligne] en DataFrame ou Dataset, les travaux suivants élégamment:
Dire, le schéma est le StructType de la ligne,puis
val rows: Array[Row]=...
implicit val encoder = RowEncoder.apply(schema)
import spark.implicits._
rows.toDS