Comment convertir un objet rdd en dataframe en spark

comment convertir un RDD ( org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] ) en Dataframe org.apache.spark.sql.DataFrame . J'ai converti une base de données en rdd en utilisant .rdd . Après le traitement, je le veux dans dataframe. Comment puis-je faire cela ?

94
demandé sur suci 2015-04-01 08:38:33

10 réponses

SqlContext a un certain nombre de createDataFrame méthodes qui créent un DataFrame donné un RDD . J'imagine que l'un d'eux conviendra à votre contexte.

par exemple:

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

crée une base de données à partir d'un RDD contenant des lignes en utilisant le schéma.

67
répondu The Archetypal Paul 2015-04-01 08:11:08

en supposant que votre RDD [ligne] s'appelle rdd, vous pouvez utiliser:

val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._
rdd.toDF()
61
répondu dtjones 2015-07-07 18:27:11

ce code fonctionne parfaitement de Spark 2.x avec Scala 2.11

Importer les classes

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}

"créer SparkSession objet, ici c'est spark

val spark: SparkSession = SparkSession.builder.master("local").getOrCreate
val sc = spark.sparkContext // Just used to create test RDDs

disons un RDD pour en faire un DataFrame

val rdd = sc.parallelize(
  Seq(
    ("first", Array(2.0, 1.0, 2.1, 5.4)),
    ("test", Array(1.5, 0.5, 0.9, 3.7)),
    ("choose", Array(8.0, 2.9, 9.1, 2.5))
  )
)

Méthode 1

par SparkSession.createDataFrame(RDD obj) .

val dfWithoutSchema = spark.createDataFrame(rdd)

dfWithoutSchema.show()
+------+--------------------+
|    _1|                  _2|
+------+--------------------+
| first|[2.0, 1.0, 2.1, 5.4]|
|  test|[1.5, 0.5, 0.9, 3.7]|
|choose|[8.0, 2.9, 9.1, 2.5]|
+------+--------------------+

Méthode 2 1519300920"

en utilisant SparkSession.createDataFrame(RDD obj) et en spécifiant le nom des colonnes.

val dfWithSchema = spark.createDataFrame(rdd).toDF("id", "vals")

dfWithSchema.show()
+------+--------------------+
|    id|                vals|
+------+--------------------+
| first|[2.0, 1.0, 2.1, 5.4]|
|  test|[1.5, 0.5, 0.9, 3.7]|
|choose|[8.0, 2.9, 9.1, 2.5]|
+------+--------------------+

la Méthode 3 (Réelle réponse à la question)

de cette façon, l'entrée rdd doit être du type RDD[Row] .

val rowsRdd: RDD[Row] = sc.parallelize(
  Seq(
    Row("first", 2.0, 7.0),
    Row("second", 3.5, 2.5),
    Row("third", 7.0, 5.9)
  )
)

créer le schéma

val schema = new StructType()
  .add(StructField("id", StringType, true))
  .add(StructField("val1", DoubleType, true))
  .add(StructField("val2", DoubleType, true))

Appliquer maintenant les deux rowsRdd et schema à createDataFrame()

val df = spark.createDataFrame(rowsRdd, schema)

df.show()
+------+----+----+
|    id|val1|val2|
+------+----+----+
| first| 2.0| 7.0|
|second| 3.5| 2.5|
| third| 7.0| 5.9|
+------+----+----+
37
répondu mrsrinivas 2017-08-30 17:11:21

supposons que vous ayez un DataFrame et que vous voulez modifier les données des champs en les convertissant en RDD[Row] .

val aRdd = aDF.map(x=>Row(x.getAs[Long]("id"),x.getAs[List[String]]("role").head))

pour revenir à DataFrame de RDD nous avons besoin de définir le type de structure du RDD .

si le type de données était Long alors il deviendra comme LongType dans la structure.

si String alors StringType de structure.

val aStruct = new StructType(Array(StructField("id",LongType,nullable = true),StructField("role",StringType,nullable = true)))

maintenant, vous pouvez convertir le RDD en DataFrame en utilisant la méthode createDataFrame .

val aNamedDF = sqlContext.createDataFrame(aRdd,aStruct)
14
répondu Ajay Gupta 2015-09-26 15:54:17

Note: Cette réponse a été initialement posté ici

je poste cette réponse parce que je voudrais partager des détails supplémentaires sur les options disponibles que je n'ai pas trouvé dans les autres réponses


pour créer une base de données à partir d'un RDD de lignes, il y a deux options principales:

1) comme déjà indiqué, vous pouvez utiliser toDF() qui peut être importé par import sqlContext.implicits._ . Toutefois, cette approche ne fonctionne que pour les types de RDDs suivants:

  • RDD[Int]
  • RDD[Long]
  • RDD[String]
  • RDD[T <: scala.Product]

(source: Scaladoc de la SQLContext.implicits de l'objet)

la dernière signature signifie en fait qu'elle peut fonctionner pour un RDD de tuples ou un RDD de classes de cas (parce que tuples et classes de cas sont des sous-classes de scala.Product ).

Alors, d'utiliser cette approche pour la RDD[Row] , vous devez l'associer à un RDD[T <: scala.Product] . Cela peut être fait en mappant chaque ligne à une classe de cas personnalisé ou à un tuple, comme dans les extraits de code suivants:

val df = rdd.map({ 
  case Row(val1: String, ..., valN: Long) => (val1, ..., valN)
}).toDF("col1_name", ..., "colN_name")

ou

case class MyClass(val1: String, ..., valN: Long = 0L)
val df = rdd.map({ 
  case Row(val1: String, ..., valN: Long) => MyClass(val1, ..., valN)
}).toDF("col1_name", ..., "colN_name")

Le principal inconvénient de cette approche (à mon avis) est que vous devez explicitement définir le schéma du datagramme résultant dans la fonction map, colonne par colonne. Peut-être que cela peut être fait programmatiquement si vous ne connaissez pas le schéma à l'avance, mais les choses peuvent devenir un peu désordonnée là. Donc, alternativement, il y a une autre option:


2) vous pouvez utiliser createDataFrame(rowRDD: RDD[Row], schema: StructType) comme dans la réponse acceptée, qui est disponible dans le SQLContext object. Exemple pour convertir un RDD d'une ancienne DataFrame:

val rdd = oldDF.rdd
val newDF = oldDF.sqlContext.createDataFrame(rdd, oldDF.schema)

notez qu'il n'est pas nécessaire de définir explicitement une colonne de schéma. Nous réutilisons le vieux schéma DF, qui est de classe StructType et peut être facilement étendu. Cependant, cette approche n'est parfois pas possible, et dans certains cas peut s'avérer moins efficace que le premier.

12
répondu Daniel de Paula 2017-05-23 11:33:25

voici un exemple simple de conversion de votre liste en Spark RDD et de conversion de cette Spark RDD en Dataframe.

s'il vous plaît noter que j'ai utilisé la version scala de Spark-shell pour exécuter le code suivant, ici sc est une instance de SparkContext qui est implicitement disponible dans Spark-shell. Espérons qu'il répondra à vos questions.

scala> val numList = List(1,2,3,4,5)
numList: List[Int] = List(1, 2, 3, 4, 5)

scala> val numRDD = sc.parallelize(numList)
numRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[80] at parallelize at <console>:28

scala> val numDF = numRDD.toDF
numDF: org.apache.spark.sql.DataFrame = [_1: int]

scala> numDF.show
+---+
| _1|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
+---+
5
répondu Rashmit Rathod 2015-12-05 09:11:34

Méthode 1: (Scala)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val df_2 = sc.parallelize(Seq((1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c"))).toDF("x", "y", "z")

Méthode 2: (Scala)

case class temp(val1: String,val3 : Double) 

val rdd = sc.parallelize(Seq(
  Row("foo",  0.5), Row("bar",  0.0)
))
val rows = rdd.map({case Row(val1:String,val3:Double) => temp(val1,val3)}).toDF()
rows.show()

Méthode 1: (Python)

from pyspark.sql import Row
l = [('Alice',2)]
Person = Row('name','age')
rdd = sc.parallelize(l)
person = rdd.map(lambda r:Person(*r))
df2 = sqlContext.createDataFrame(person)
df2.show()

Méthode 2: (Python)

from pyspark.sql.types import * 
l = [('Alice',2)]
rdd = sc.parallelize(l)
schema =  StructType([StructField ("name" , StringType(), True) , 
StructField("age" , IntegerType(), True)]) 
df3 = sqlContext.createDataFrame(rdd, schema) 
df3.show()

extrait la valeur de l'objet row et ensuite appliqué la classe de cas pour convertir rdd en DF

val temp1 = attrib1.map{case Row ( key: Int ) => s"$key" }
val temp2 = attrib2.map{case Row ( key: Int) => s"$key" }

case class RLT (id: String, attrib_1 : String, attrib_2 : String)
import hiveContext.implicits._

val df = result.map{ s => RLT(s(0),s(1),s(2)) }.toDF
5
répondu Aravind Krishnakumar 2017-03-02 16:38:32

sur les nouvelles versions de spark (2.0+). Cela fonctionnera même sans sqlcontext disponible.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql._
import org.apache.spark.sql.types._

val spark = SparkSession
  .builder()
  .getOrCreate()
import spark.implicits._

val dfSchema = Seq("col1", "col2", "col3")
rdd.toDF(dfSchema: _*)
3
répondu ozzieisaacs 2017-08-09 16:55:30
One needs to create a schema, and attach it to the Rdd.

si val spark est le produit d'une étincelle.constructeur...

    import org.apache.spark._
    import org.apache.spark.sql._       
    import org.apache.spark.sql.types._

    /* Lets gin up some sample data:
     * As RDD's and dataframes can have columns of differing types, lets make our
     * sample data a three wide, two tall, rectangle of mixed types.
     * A column of Strings, a column of Longs, and a column of Doubules 
     */
    val arrayOfArrayOfAnys = Array.ofDim[Any](2,3)
    arrayOfArrayOfAnys(0)(0)="aString"
    arrayOfArrayOfAnys(0)(1)=0L
    arrayOfArrayOfAnys(0)(2)=3.14159
    arrayOfArrayOfAnys(1)(0)="bString"
    arrayOfArrayOfAnys(1)(1)=9876543210L
    arrayOfArrayOfAnys(1)(2)=2.71828

    /* The way to convert an anything which looks rectangular, 
     * (Array[Array[String]] or Array[Array[Any]] or Array[Row], ... ) into an RDD is to 
     * throw it into sparkContext.parallelize.
     * http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext shows
     * the parallelize definition as 
     *     def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)
     * so in our case our ArrayOfArrayOfAnys is treated as a sequence of ArraysOfAnys.
     * Will leave the numSlices as the defaultParallelism, as I have no particular cause to change it. 
     */
    val rddOfArrayOfArrayOfAnys=spark.sparkContext.parallelize(arrayOfArrayOfAnys)

    /* We'll be using the sqlContext.createDataFrame to add a schema our RDD.
     * The RDD which goes into createDataFrame is an RDD[Row] which is not what we happen to have.
     * To convert anything one tall and several wide into a Row, one can use Row.fromSeq(thatThing.toSeq)
     * As we have an RDD[somethingWeDontWant], we can map each of the RDD rows into the desired Row type. 
     */     
    val rddOfRows=rddOfArrayOfArrayOfAnys.map(f=>
        Row.fromSeq(f.toSeq)
    )

    /* Now to construct our schema. This needs to be a StructType of 1 StructField per column in our dataframe.
     * https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.StructField shows the definition as
     *   case class StructField(name: String, dataType: DataType, nullable: Boolean = true, metadata: Metadata = Metadata.empty)
     * Will leave the two default values in place for each of the columns:
     *        nullability as true, 
     *        metadata as an empty Map[String,Any]
     *   
     */

    val schema = StructType(
        StructField("colOfStrings", StringType) ::
        StructField("colOfLongs"  , LongType  ) ::
        StructField("colOfDoubles", DoubleType) ::
        Nil
    )

    val df=spark.sqlContext.createDataFrame(rddOfRows,schema)
    /*
     *      +------------+----------+------------+
     *      |colOfStrings|colOfLongs|colOfDoubles|
     *      +------------+----------+------------+
     *      |     aString|         0|     3.14159|
     *      |     bString|9876543210|     2.71828|
     *      +------------+----------+------------+
    */ 
    df.show 

mêmes étapes, mais avec moins de déclarations val:

    val arrayOfArrayOfAnys=Array(
        Array("aString",0L         ,3.14159),
        Array("bString",9876543210L,2.71828)
    )

    val rddOfRows=spark.sparkContext.parallelize(arrayOfArrayOfAnys).map(f=>Row.fromSeq(f.toSeq))

    /* If one knows the datatypes, for instance from JDBC queries as to RDBC column metadata:
     * Consider constructing the schema from an Array[StructField].  This would allow looping over 
     * the columns, with a match statement applying the appropriate sql datatypes as the second
     *  StructField arguments.   
     */
    val sf=new Array[StructField](3)
    sf(0)=StructField("colOfStrings",StringType)
    sf(1)=StructField("colOfLongs"  ,LongType  )
    sf(2)=StructField("colOfDoubles",DoubleType)        
    val df=spark.sqlContext.createDataFrame(rddOfRows,StructType(sf.toList))
    df.show
1
répondu teserecter 2017-01-17 13:02:33

pour convertir Un tableau[ligne] en DataFrame ou Dataset, les travaux suivants élégamment:

Dire, le schéma est le StructType de la ligne,puis

val rows: Array[Row]=...
implicit val encoder = RowEncoder.apply(schema)
import spark.implicits._
rows.toDS
-1
répondu Tom 2018-04-03 00:43:52