Comment définir schema pour custom type dans Spark SQL?
l'exemple de code suivant tente de mettre des objets case dans une dataframe. Le code inclut la définition d'une hiérarchie d'objet de cas et d'une classe de cas en utilisant ce trait:
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext
sealed trait Some
case object AType extends Some
case object BType extends Some
case class Data( name : String, t: Some)
object Example {
def main(args: Array[String]) : Unit = {
val conf = new SparkConf()
.setAppName( "Example" )
.setMaster( "local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val df = sc.parallelize( Seq( Data( "a", AType), Data( "b", BType) ), 4).toDF()
df.show()
}
}
lors de l'exécution du code, je rencontre malheureusement l'exception suivante:
java.lang.UnsupportedOperationException: Schema for type Some is not supported
Questions
- est-il possible d'ajouter ou de définir un schéma pour certains types (ici tapez
Some
)? - Existe-t-il une autre approche pour représenter ce genre d'énumération?
- j'ai essayé d'utiliser
Enumeration
directement, mais également sans succès. (voir ci-dessous)
- j'ai essayé d'utiliser
Code pour Enumeration
:
object Some extends Enumeration {
type Some = Value
val AType, BType = Value
}
Merci d'avance. J'espère que la meilleure approche n'est pas d'utiliser des cordes à la place.
1 réponses
Étincelle 2.0.0+ :
UserDefinedType
a été rendu privé dans Spark 2.0.0 et comme pour l'instant il n'a pas de remplacement amical Dataset
.
voir: SPARK-14155 (Hide UserDefinedType in Spark 2.0)
la Plupart du temps statiquement typé Dataset
peut servir de remplacement
Il ya un JIRA en attente SPARK-7768 pour rendre L'API UDT public encore une fois avec la version 2.4 de target.
Voir aussi Comment stocker des objets personnalisés dans le Dataset?
étincelle < 2.0.0
y a-t-il une possibilité d'ajouter ou de définir un schéma pour certains types (ici tapez certains)?
je suppose que la réponse dépend à quel point vous en avez besoin. Il semble qu'il soit possible de créer un UserDefinedType
mais elle exige l'accès à DeveloperApi
et n'est pas exactement simple ou bien documentée.
import org.apache.spark.sql.types._
@SQLUserDefinedType(udt = classOf[SomeUDT])
sealed trait Some
case object AType extends Some
case object BType extends Some
class SomeUDT extends UserDefinedType[Some] {
override def sqlType: DataType = IntegerType
override def serialize(obj: Any) = {
obj match {
case AType => 0
case BType => 1
}
}
override def deserialize(datum: Any): Some = {
datum match {
case 0 => AType
case 1 => BType
}
}
override def userClass: Class[Some] = classOf[Some]
}
vous devriez probablement remplacer hashCode
et equals
aussi.
son homologue de PySpark peut ressembler à ceci:
from enum import Enum, unique
from pyspark.sql.types import UserDefinedType, IntegerType
class SomeUDT(UserDefinedType):
@classmethod
def sqlType(self):
return IntegerType()
@classmethod
def module(cls):
return cls.__module__
@classmethod
def scalaUDT(cls): # Required in Spark < 1.5
return 'net.zero323.enum.SomeUDT'
def serialize(self, obj):
return obj.value
def deserialize(self, datum):
return {x.value: x for x in Some}[datum]
@unique
class Some(Enum):
__UDT__ = SomeUDT()
AType = 0
BType = 1
Spark < 1.5 Python UDT nécessite un jumelé Scala UDT, mais il n'est plus le cas dans la version 1.5.
pour un UDT simple comme vous pouvez utiliser simple types (par exemple IntegerType
au lieu de Struct
en entier ).