Comment stocker des objets personnalisés dans Dataset?

selon introduisant les données D'étincelles :

alors que nous attendons Spark 2.0, nous prévoyons des améliorations intéressantes aux ensembles de données, en particulier: ... Encodeurs personnalisés-alors que nous autorégulons actuellement les encodeurs pour une grande variété de types, nous aimerions ouvrir une API pour les objets personnalisés.

et tente de stocker le type personnalisé dans un Dataset conduire à l'erreur suivante comme:

impossible de trouver un encodeur pour le type stocké dans un ensemble de données. Les types primitifs (Int, String, etc) et les types de produits (classes de cas) sont supportés par l'importation de sqlContext.implicite._ La prise en charge de la sérialisation d'autres types sera ajoutée dans les versions futures

ou:

Java.lang.UnsupportedOperationException: aucun encodeur trouvé pour ....

y a-t-il existant des solutions de contournement?


Note cette question n'existe que comme point d'entrée pour une réponse Wiki communautaire. N'hésitez pas à mettre à jour / améliorer les questions et les réponses.

109
demandé sur zero323 2016-04-15 16:11:07

7 réponses

mise à Jour

cette réponse est toujours valable et informative, bien que les choses soient maintenant mieux depuis 2.2 / 2.3, qui ajoute le support d'encodeur intégré pour Set , Seq , Map , Date , Timestamp , et BigDecimal . Si vous vous en tenez à faire des types avec seulement des classes de cas et les types habituels de Scala, vous devriez être d'accord avec juste l'implicite dans SQLImplicits .


malheureusement, pratiquement rien n'a été ajouté pour aider à cela. La recherche de @since 2.0.0 dans Encoders.scala ou SQLImplicits.scala trouve des choses principalement à faire avec les types primitifs (et quelques retouches des classes de cas). Donc, première chose à dire: il n'y a actuellement pas de véritable bon support pour les encodeurs de classe personnalisés . Une fois cela fait, voici quelques astuces qui font un travail aussi bien que ce que nous pouvons espérer, étant donné ce que nous actuellement à notre disposition. En guise de mise en garde: cela ne fonctionnera pas parfaitement et je ferai de mon mieux pour rendre toutes les limites claires et directes.

Quel est le problème exactement

lorsque vous voulez faire un ensemble de données, Spark "nécessite un encodeur (pour convertir un objet JVM de type T à et à partir de la représentation interne SQL Spark) qui est généralement créé automatiquement par implicits à partir d'un SparkSession , ou peut être créé explicitement par appelant les méthodes statiques sur Encoders "(tiré du docs sur createDataset ). Un codeur prendra la forme Encoder[T]T est le type d'encodage. La première suggestion est d'ajouter import spark.implicits._ (qui vous donne ces codeurs implicites) et la seconde suggestion est de passer explicitement dans l'encodeur implicite en utilisant cet ensemble de fonctions liées à l'encodeur.

Il n'y a pas d'encodeur disponible pour les classes régulières, donc

import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

vous donnera l'erreur implicite de temps de compilation liée suivante:

impossible de trouver un encodeur pour le type stocké dans un ensemble de données. Les types primitifs (Int, String, etc) et les types de produits (classes de cas) sont supportés par l'importation de sqlContext.implicite._ La prise en charge de la sérialisation d'autres types sera ajoutée dans les versions futures

Cependant, si vous envelopper quel que soit le type que vous venez d'utiliser pour obtenir l'erreur ci-dessus dans une certaine classe qui s'étend Product , l'erreur confusingly obtient retardé à l'exécution, donc

import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))

compile très bien, mais échoue à l'exécution avec

de java.lang.UnsupportedOperationException: aucun encodeur trouvé pour MyObj

la raison en est que L'étincelle d'encodeurs crée avec les implicits sont en fait seulement fait à l'exécution (via Scala relfection). Dans ce cas, toutes les vérifications de L'étincelle au moment de la compilation est que la classe la plus externe s'étend Product (ce que font toutes les classes de cas), et ne se rend compte à l'exécution qu'elle ne sait toujours pas quoi faire avec MyObj (le même problème se produit si j'ai essayé de faire un Dataset[(Int,MyObj)] - L'étincelle attend jusqu'à l'exécution pour vomir sur MyObj ). Ce sont là des problèmes centraux qui doivent absolument être résolus:

  • certaines classes qui étendent Product compilent malgré un crash constant à l'exécution et
  • il n'y a aucun moyen de passer dans les encodeurs personnalisés pour les types imbriqués (Je n'ai aucun moyen d'alimenter Spark un encodeur pour juste MyObj tel qu'il sache alors coder Wrap[MyObj] ou (Int,MyObj) ).

il suffit d'utiliser kryo

la solution que tout le monde suggère est d'utiliser le kryo "1519710920 de l'encodeur".

import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

cela devient assez fastidieux rapide cependant. Surtout si votre code manipule toutes sortes d'ensembles de données, de jointures, de regroupements, etc. Vous finissez par accumuler un tas d'implications supplémentaires. Donc, pourquoi ne pas simplement faire un implicite qui fait tout cela automatiquement?

import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = 
  org.apache.spark.sql.Encoders.kryo[A](ct)

et maintenant, il semble que je peux faire presque tout ce que je veux (l'exemple ci-dessous ne fonctionnera pas dans le spark-shellspark.implicits._ est automatiquement imported)

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i,  d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!

ou presque. Le problème est que l'utilisation de kryo conduit à Spark simplement stocker chaque ligne dans l'ensemble de données comme un objet binaire plat. Pour map , filter , foreach C'est suffisant, mais pour des opérations comme join , Spark a vraiment besoin que celles-ci soient séparées en colonnes. En examinant le schéma pour d2 ou d3 , vous voyez qu'il n'y a qu'une colonne binaire:

d2.printSchema
// root
//  |-- value: binary (nullable = true)

partielle solution pour tuples

ainsi, en utilisant la magie des implicits dans Scala (plus dans 6.26.3 résolution de surcharge ), je peux me faire une série d'implicits qui feront un travail aussi bon que possible, au moins pour les tuples, et qui fonctionneront bien avec les implicits existants:

import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._  // we can still take advantage of all the old implicits

implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)

implicit def tuple2[A1, A2](
  implicit e1: Encoder[A1],
           e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)

implicit def tuple3[A1, A2, A3](
  implicit e1: Encoder[A1],
           e2: Encoder[A2],
           e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)

// ... you can keep making these

alors, armé de ces implications, je peux faire fonctionner mon exemple au-dessus, bien qu'avec une colonne rebaptisée

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i  ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")

I Je n'ai pas encore trouvé comment obtenir les noms de tuples attendus ( _1 , _2 , ...) par défaut sans les renommer - si quelqu'un d'autre veut jouer avec ceci, ce est l'endroit où le nom "value" est introduit et ce est l'endroit où les noms de tuples sont habituellement ajoutés. Cependant, le point clé est que j'ai maintenant un schéma bien structuré:

d4.printSchema
// root
//  |-- _1: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)
//  |-- _2: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)

donc, en Résumé, Cette solution:

  • nous permet d'obtenir des colonnes séparées pour les tuples (de sorte que nous pouvons rejoindre sur tuples à nouveau, yay!)
  • nous pouvons à nouveau nous fier aux implicits (donc pas besoin de passer dans kryo partout)
  • est presque entièrement rétrograde compatible avec import spark.implicits._ (avec un certain renommage impliqué)
  • ne pas unissons-nous sur le kyro sérialisé colonnes binaires, encore moins sur les champs qui peuvent avoir
  • a l'effet secondaire désagréable de renommer certaines des colonnes de tuples en" valeur "(si nécessaire, cela peut être défait en convertissant .toDF , en spécifiant de nouveaux noms de colonne, et en convertissant de nouveau à un ensemble de données-et les noms de schéma semblent être préservés par des jointures, où ils sont les plus nécessaires).

solution partielle pour les classes en général

celui-ci est moins agréable et n'a pas de bonne solution. Cependant, maintenant que nous avons la solution tuple ci-dessus, j'ai le pressentiment que la solution de conversion implicite d'une autre réponse sera un peu moins douloureuse aussi puisque vous pouvez convertir vos classes plus complexes en tuples. Ensuite, après avoir créé l'ensemble de données, vous devriez probablement renommer les colonnes en utilisant l'approche de base de données. Si tout va bien, c'est vraiment une amélioration puisque je peux maintenant effectuer jointures sur les champs de mes classes. Si j'avais juste utilisé un plat binaire kryo sérialiseur qui n'aurait pas été possible.

Voici un exemple qui fait un peu de tout: j'ai une classe MyObj qui a des champs de types Int , java.util.UUID , et Set[String] . Le premier prend soin de lui-même. Le second, bien que je puisse sérialiser en utilisant kryo serait plus utile si stocké comme un String (depuis UUID s sont généralement quelque chose que je voudrais joindre contre). Troisième vraiment seulement le fait dans une colonne binaire.

class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])

// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])

// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
  new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)

maintenant, je peux créer un ensemble de données avec un schéma agréable en utilisant cette machine:

val d = spark.createDataset(Seq[MyObjEncoded](
  new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
  new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]

et le schéma me montre je colonnes avec les bons noms et avec les deux premiers deux choses que je peux joindre contre.

d.printSchema
// root
//  |-- i: integer (nullable = false)
//  |-- u: string (nullable = true)
//  |-- s: binary (nullable = true)
170
répondu Alec 2017-09-13 17:19:00
  1. utilisant des codeurs génériques.

    il existe deux encodeurs génériques disponibles pour le moment kryo et javaSerialization où ce dernier est explicitement décrit comme:

    extrêmement inefficace et ne doit être utilisée qu'en dernier recours.

    , dans l'hypothèse de la classe suivante

    class Bar(i: Int) {
      override def toString = s"bar $i"
      def bar = i
    }
    

    vous pouvez utiliser ces encodeurs en ajoutant l'encodeur implicite:

    object BarEncoders {
      implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = 
      org.apache.spark.sql.Encoders.kryo[Bar]
    }
    

    qui peuvent être utilisés ensemble comme suit:

    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarEncoders._
    
        val ds = Seq(new Bar(1)).toDS
        ds.show
    
        sc.stop()
      }
    }
    

    il stocke des objets comme binary colonne de sorte que lorsque converti en DataFrame vous obtenez le schéma suivant:

    root
     |-- value: binary (nullable = true)
    

    il est également possible d'encoder tuples en utilisant l'encodeur kryo pour un champ spécifique:

    val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar])
    
    spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder)
    // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary]
    

    s'il vous plaît noter que nous ne dépend de codeurs implicites ici, mais passer l'encodeur explicitement de sorte que cela ne fonctionnera probablement pas avec la méthode toDS .

  2. utilisant des conversions implicites:

    fournit des conversions implicites entre la représentation qui peut être encodée et la classe personnalisée, par exemple:

    object BarConversions {
      implicit def toInt(bar: Bar): Int = bar.bar
      implicit def toBar(i: Int): Bar = new Bar(i)
    }
    
    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarConversions._
    
        type EncodedBar = Int
    
        val bars: RDD[EncodedBar]  = sc.parallelize(Seq(new Bar(1)))
        val barsDS = bars.toDS
    
        barsDS.show
        barsDS.map(_.bar).show
    
        sc.stop()
      }
    }
    

questions connexes:

28
répondu zero323 2017-05-23 12:10:26

les encodeurs fonctionnent plus ou moins de la même manière dans Spark2.0 . Et Kryo est toujours le choix recommandé serialization .

vous pouvez regarder l'exemple suivant avec spark-shell

scala> import spark.implicits._
import spark.implicits._

scala> import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Encoders

scala> case class NormalPerson(name: String, age: Int) {
 |   def aboutMe = s"I am ${name}. I am ${age} years old."
 | }
defined class NormalPerson

scala> case class ReversePerson(name: Int, age: String) {
 |   def aboutMe = s"I am ${name}. I am ${age} years old."
 | }
defined class ReversePerson

scala> val normalPersons = Seq(
 |   NormalPerson("Superman", 25),
 |   NormalPerson("Spiderman", 17),
 |   NormalPerson("Ironman", 29)
 | )
normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29))

scala> val ds1 = sc.parallelize(normalPersons).toDS
ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int]

scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]

scala> ds1.show()
+---------+---+
|     name|age|
+---------+---+
| Superman| 25|
|Spiderman| 17|
|  Ironman| 29|
+---------+---+

scala> ds2.show()
+----+---------+
|name|      age|
+----+---------+
|  25| Superman|
|  17|Spiderman|
|  29|  Ironman|
+----+---------+

scala> ds1.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Superman. I am 25 years old.
I am Spiderman. I am 17 years old.

scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]

scala> ds2.foreach(p => println(p.aboutMe))
I am 17. I am Spiderman years old.
I am 25. I am Superman years old.
I am 29. I am Ironman years old.

Jusqu'à présent] il n'y avait pas de appropriate encoders dans la portée actuelle de sorte que nos personnes n'étaient pas codées comme des valeurs binary . Mais cela changera une fois que nous aurons fourni quelques encodeurs implicit en utilisant la sérialisation Kryo .

// Provide Encoders

scala> implicit val normalPersonKryoEncoder = Encoders.kryo[NormalPerson]
normalPersonKryoEncoder: org.apache.spark.sql.Encoder[NormalPerson] = class[value[0]: binary]

scala> implicit val reversePersonKryoEncoder = Encoders.kryo[ReversePerson]
reversePersonKryoEncoder: org.apache.spark.sql.Encoder[ReversePerson] = class[value[0]: binary]

// Ecoders will be used since they are now present in Scope

scala> val ds3 = sc.parallelize(normalPersons).toDS
ds3: org.apache.spark.sql.Dataset[NormalPerson] = [value: binary]

scala> val ds4 = ds3.map(np => ReversePerson(np.age, np.name))
ds4: org.apache.spark.sql.Dataset[ReversePerson] = [value: binary]

// now all our persons show up as binary values
scala> ds3.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

scala> ds4.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

// Our instances still work as expected    

scala> ds3.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Spiderman. I am 17 years old.
I am Superman. I am 25 years old.

scala> ds4.foreach(p => println(p.aboutMe))
I am 25. I am Superman years old.
I am 29. I am Ironman years old.
I am 17. I am Spiderman years old.
5
répondu Sarvesh Kumar Singh 2016-09-09 19:48:46

dans le cas de classe Java Bean, cela peut être utile

import spark.sqlContext.implicits._
import org.apache.spark.sql.Encoders
implicit val encoder = Encoders.bean[MyClasss](classOf[MyClass])

Maintenant, vous pouvez simplement lire la dataFrame comme DataFrame personnalisé

dataFrame.as[MyClass]

cela créera un encodeur de classe personnalisé et non un binaire.

3
répondu Akash Mahajan 2017-01-05 13:42:13

vous pouvez utiliser UDTRegistration et ensuite les classes de cas, Tuples, etc... tous fonctionnent correctement avec votre type défini par L'utilisateur!

dites que vous voulez utiliser un Enum personnalisé:

trait CustomEnum { def value:String }
case object Foo extends CustomEnum  { val value = "F" }
case object Bar extends CustomEnum  { val value = "B" }
object CustomEnum {
  def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get
}

enregistrer comme ceci:

// First define a UDT class for it:
class CustomEnumUDT extends UserDefinedType[CustomEnum] {
  override def sqlType: DataType = org.apache.spark.sql.types.StringType
  override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value)
  // Note that this will be a UTF8String type
  override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString)
  override def userClass: Class[CustomEnum] = classOf[CustomEnum]
}

// Then Register the UDT Class!
// NOTE: you have to put this file into the org.apache.spark package!
UDTRegistration.register(classOf[CustomEnum].getName, classOf[CustomEnumUDT].getName)

alors utilisez-le!

case class UsingCustomEnum(id:Int, en:CustomEnum)

val seq = Seq(
  UsingCustomEnum(1, Foo),
  UsingCustomEnum(2, Bar),
  UsingCustomEnum(3, Foo)
).toDS()
seq.filter(_.en == Foo).show()
println(seq.collect())

Dites vous souhaitez utiliser un Polymorphe:

trait CustomPoly
case class FooPoly(id:Int) extends CustomPoly
case class BarPoly(value:String, secondValue:Long) extends CustomPoly

... et l'utiliser comme ceci:

case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()

vous pouvez écrire un UDT personnalisé qui encode tout en octets (j'utilise la sérialisation java ici, mais il est probablement préférable d'utiliser le contexte Kryo d'instrument Spark).

première définition de la classe UDT:

class CustomPolyUDT extends UserDefinedType[CustomPoly] {
  val kryo = new Kryo()

  override def sqlType: DataType = org.apache.spark.sql.types.BinaryType
  override def serialize(obj: CustomPoly): Any = {
    val bos = new ByteArrayOutputStream()
    val oos = new ObjectOutputStream(bos)
    oos.writeObject(obj)

    bos.toByteArray
  }
  override def deserialize(datum: Any): CustomPoly = {
    val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
    val ois = new ObjectInputStream(bis)
    val obj = ois.readObject()
    obj.asInstanceOf[CustomPoly]
  }

  override def userClass: Class[CustomPoly] = classOf[CustomPoly]
}

puis l'enregistrer:

// NOTE: The file you do this in has to be inside of the org.apache.spark package!
UDTRegistration.register(classOf[CustomPoly].getName, classOf[CustomPolyUDT].getName)

, Alors vous pouvez l'utiliser!

// As shown above:
case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()
3
répondu ChoppyTheLumberjack 2018-08-21 22:44:08

mes exemples seront en Java, mais je n'imagine pas qu'il soit difficile de s'adapter à Scala.

j'ai réussi à convertir RDD<Fruit> en Dataset<Fruit> en utilisant spark.createDataset et codeurs.bean aussi longtemps que Fruit est un simple Java Bean .

Étape 1: Créer le Simple Java Bean.

public class Fruit implements Serializable {
    private String name  = "default-fruit";
    private String color = "default-color";

    // AllArgsConstructor
    public Fruit(String name, String color) {
        this.name  = name;
        this.color = color;
    }

    // NoArgsConstructor
    public Fruit() {
        this("default-fruit", "default-color");
    }

    // ...create getters and setters for above fields
    // you figure it out
}

Je m'en tiendrais à des classes avec des types primitifs et des chaînes de caractères comme des champs avant que les gens des banques de données renforcent leurs encodeurs. si vous avez une classe avec un objet imbriqué, créez un autre Java Bean simple avec tous ses champs aplatis, de sorte que vous pouvez utiliser les transformations RDD pour mapper le type complexe au plus simple. bien sûr que c'est un peu de travail supplémentaire, mais j'imagine que ça aidera beaucoup sur la performance en travaillant avec un schéma plat.

Étape 2: Obtenez votre ensemble de données à partir du RDD

SparkSession spark = SparkSession.builder().getOrCreate();
JavaSparkContext jsc = new JavaSparkContext();

List<Fruit> fruitList = ImmutableList.of(
    new Fruit("apple", "red"),
    new Fruit("orange", "orange"),
    new Fruit("grape", "purple"));
JavaRDD<Fruit> fruitJavaRDD = jsc.parallelize(fruitList);


RDD<Fruit> fruitRDD = fruitJavaRDD.rdd();
Encoder<Fruit> fruitBean = Encoders.bean(Fruit.class);
Dataset<Fruit> fruitDataset = spark.createDataset(rdd, bean);

et voilà! Mouillez, rincez, répétez.

1
répondu Jimmy Da 2017-06-06 20:19:09

pour ceux qui pourraient dans ma situation je mets ma réponse ici, aussi.

Pour être plus précis,

  1. je lisais "Set typed data" à partir de SQLContext. Donc le format de données original est DataFrame.

    val sample = spark.sqlContext.sql("select 1 as a, collect_set(1) as b limit 1") sample.show()

    +---+---+ | a| b| +---+---+ | 1|[1]| +---+---+

  2. convertissez-le en RDD en utilisant rdd.map() avec mutables.De type WrappedArray.

    sample .rdd.map(r => (r.getInt(0), r.getAs[mutable.WrappedArray[Int]](1).toSet)) .collect() .foreach(println)

    résultat:

    (1,Set(1))

1
répondu Taeheon Kwon 2018-05-02 01:04:16