Comment définir schema pour custom type dans Spark SQL?

l'exemple de code suivant tente de mettre des objets case dans une dataframe. Le code inclut la définition d'une hiérarchie d'objet de cas et d'une classe de cas en utilisant ce trait:

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext

sealed trait Some
case object AType extends Some
case object BType extends Some

case class Data( name : String, t: Some)

object Example {
  def main(args: Array[String]) : Unit = {
    val conf = new SparkConf()
      .setAppName( "Example" )
      .setMaster( "local[*]")

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    import sqlContext.implicits._

    val df = sc.parallelize( Seq( Data( "a", AType), Data( "b", BType) ), 4).toDF()
    df.show()
  }
}    

lors de l'exécution du code, je rencontre malheureusement l'exception suivante:

java.lang.UnsupportedOperationException: Schema for type Some is not supported

Questions

  • est-il possible d'ajouter ou de définir un schéma pour certains types (ici tapez Some )?
  • Existe-t-il une autre approche pour représenter ce genre d'énumération?
    • j'ai essayé d'utiliser Enumeration directement, mais également sans succès. (voir ci-dessous)

Code pour Enumeration :

object Some extends Enumeration {
  type Some = Value
  val AType, BType = Value
}

Merci d'avance. J'espère que la meilleure approche n'est pas d'utiliser des cordes à la place.

20
demandé sur Jacek Laskowski 2015-09-07 16:59:20

1 réponses

Étincelle 2.0.0+ :

UserDefinedType a été rendu privé dans Spark 2.0.0 et comme pour l'instant il n'a pas de remplacement amical Dataset .

voir: SPARK-14155 (Hide UserDefinedType in Spark 2.0)

la Plupart du temps statiquement typé Dataset peut servir de remplacement Il ya un JIRA en attente SPARK-7768 pour rendre L'API UDT public encore une fois avec la version 2.4 de target.

Voir aussi Comment stocker des objets personnalisés dans le Dataset?

étincelle < 2.0.0

y a-t-il une possibilité d'ajouter ou de définir un schéma pour certains types (ici tapez certains)?

je suppose que la réponse dépend à quel point vous en avez besoin. Il semble qu'il soit possible de créer un UserDefinedType mais elle exige l'accès à DeveloperApi et n'est pas exactement simple ou bien documentée.

import org.apache.spark.sql.types._

@SQLUserDefinedType(udt = classOf[SomeUDT])
sealed trait Some
case object AType extends Some
case object BType extends Some

class SomeUDT extends UserDefinedType[Some] {
  override def sqlType: DataType = IntegerType

  override def serialize(obj: Any) = {
    obj match {
      case AType => 0
      case BType => 1
    }
  }

  override def deserialize(datum: Any): Some = {
    datum match {
      case 0 => AType
      case 1 => BType
    }
  }

  override def userClass: Class[Some] = classOf[Some]
}

vous devriez probablement remplacer hashCode et equals aussi.

son homologue de PySpark peut ressembler à ceci:

from enum import Enum, unique
from pyspark.sql.types import UserDefinedType, IntegerType

class SomeUDT(UserDefinedType):
    @classmethod
    def sqlType(self):
        return IntegerType()

    @classmethod
    def module(cls):
        return cls.__module__

    @classmethod 
    def scalaUDT(cls): # Required in Spark < 1.5
        return 'net.zero323.enum.SomeUDT'

    def serialize(self, obj):
        return obj.value

    def deserialize(self, datum):
        return {x.value: x for x in Some}[datum]

@unique
class Some(Enum):
    __UDT__ = SomeUDT()
    AType = 0
    BType = 1

Spark < 1.5 Python UDT nécessite un jumelé Scala UDT, mais il n'est plus le cas dans la version 1.5.

pour un UDT simple comme vous pouvez utiliser simple types (par exemple IntegerType au lieu de Struct en entier ).

17
répondu zero323 2018-08-10 20:35:56