py4j.protocole.Py4JJavaError lors de la sélection de la colonne imbriquée dans dataframe en utilisant select statetment

j'essaie d'effectuer une tâche simple dans spark dataframe (python) qui est de créer une nouvelle dataframe en sélectionnant des colonnes spécifiques et des colonnes imbriquées à partir d'une autre dataframe par exemple:

df.printSchema()
root
 |-- time_stamp: long (nullable = true)
 |-- country: struct (nullable = true)
 |    |-- code: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- time_zone: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- order: struct (nullable = true)
 |    |-- created_at: string (nullable = true)
 |    |-- creation_type: struct (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |-- destination: struct (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |-- ordering_user: struct (nullable = true)
 |    |    |-- cancellation_score: long (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- is_test: boolean (nullable = true)

df2=df.sqlContext.sql("""select a.country_code as country_code,
a.order_destination_state as order_destination_state,
a.order_ordering_user_id as order_ordering_user_id,
a.order_ordering_user_is_test as order_ordering_user_is_test,
a.time_stamp as time_stamp
from
(select
flat_order_creation.order.destination.state as order_destination_state,
flat_order_creation.order.ordering_user.id as order_ordering_user_id,
flat_order_creation.order.ordering_user.is_test as   order_ordering_user_is_test,
flat_order_creation.time_stamp as time_stamp
from flat_order_creation) a""")

et j'obtiens l'erreur suivante:

Traceback (most recent call last):
  File "/home/hadoop/scripts/orders_all.py", line 180, in <module>
    df2=sqlContext.sql(q)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/context.py", line 552, in sql
  File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 36, in deco
  File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o60.sql.
: java.lang.RuntimeException: [6.21] failure: ``*'' expected but `order' found

flat_order_creation.order.destination.state as order_destination_state,

j'utilise spark-submit avec master en mode local pour exécuter ce code. il est important de mentionner que lorsque je me connecte au shell pyspark et exécute le code (ligne par ligne) cela fonctionne, mais quand le soumettre (même en mode local) ne fonctionne pas. une autre chose importante à mentionner est que lors de la sélection d'un champ non imbriqué il fonctionne aussi bien. J'utilise spark 1.5.2 sur EMR (version 4.2.0)

2
demandé sur zero323 2016-01-26 16:46:20

1 réponses

sans un exemple Minimal, complet et vérifiable Je ne peux que deviner mais il semble que vous utilisez des implémentations SparkContext différentes dans le shell interactif et votre programme autonome.

aussi longtemps que les binaires Spark ont été construits avec le support De La Ruche sqlContext fourni dans le shell est un HiveContext . Entre autres différences, il fournit un analyseur SQL plus sophistiqué qu'un simple SQLContext . Vous pouvez facilement reproduisez votre problème comme suit:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveContext

val conf: SparkConf = ???
val sc: SparkContext = ???
val query = "SELECT df.foobar.order FROM df"

val hiveContext: SQLContext = new HiveContext(sc)
val sqlContext: SQLContext = new SQLContext(sc)
val json = sc.parallelize(Seq("""{"foobar": {"order": 1}}"""))

sqlContext.read.json(json).registerTempTable("df")
sqlContext.sql(query).show
// java.lang.RuntimeException: [1.18] failure: ``*'' expected but `order' found
// ...

hiveContext.read.json(json).registerTempTable("df")
hiveContext.sql(query)
// org.apache.spark.sql.DataFrame = [order: bigint]

initialisation sqlContext avec HiveContext dans le programme autonome devrait faire l'affaire:

from pyspark.sql import HiveContext

sqlContext = HiveContext(sc) 

df = sqlContext.createDataFrame(...)
df.registerTempTable("flat_order_creation")

sqlContext.sql(...)

il est important de noter que le problème ne se pose pas en lui-même, mais en utilisant le mot-clé ORDER comme nom de colonne. Donc, si utiliser HiveContext n'est pas une option, il suffit de changer le nom du champ en quelque chose d'autre.

4
répondu zero323 2016-01-26 14:35:05