Enregistrez la base de données Spark comme table de partitionnement dynamique dans la ruche
j'ai un exemple d'application qui fonctionne pour lire des fichiers csv dans une base de données. La dataframe peut être stockée sur une table ruche au format parquet en utilisant la méthode
df.saveAsTable(tablename,mode)
.
le code ci-dessus fonctionne bien, mais j'ai tellement de données pour chaque jour que je veux la partition dynamique de la table ruche basée sur la creationdate(colonne dans la table).
y a-t-il un moyen de cloisonner dynamiquement la base de données et de la stocker dans un entrepôt ruche. Vouloir s'abstenir de Codage physique de l'énoncé insert à l'aide de hivesqlcontext.sql(insert into table partittioin by(date)....)
.
Question peut être considéré comme une extension : comment sauvegarder DataFrame directement dans Hive?
toute aide est très appréciée.
5 réponses
je crois qu'il fonctionne quelque chose comme ceci:
df
est un dataframe avec l'année, le mois et les autres colonnes
df.write.partitionBy('year', 'month').saveAsTable(...)
ou
df.write.partitionBy('year', 'month').insertInto(...)
j'ai pu écrire à la table de ruche partitionnée en utilisant df.write().mode(SaveMode.Append).partitionBy("colname").saveAsTable("Table")
j'ai dû activer les propriétés suivantes pour que cela fonctionne.
hiveContext.setConf("hive.exec.dynamic.partition", "true") hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
j'ai aussi fait face à la même chose mais en utilisant les trucs suivants j'ai résolu.
quand nous faisons n'importe quelle table comme partitionné puis colonne partitionnée deviennent sensibles à la casse.
la colonne partitionnée doit être présente dans DataFrame avec le même nom (sensible à la casse). Code:
var dbName="your database name" var finaltable="your table name" // First check if table is available or not.. if (sparkSession.sql("show tables in " + dbName).filter("tableName='" +finaltable + "'").collect().length == 0) { //If table is not available then it will create for you.. println("Table Not Present \n Creating table " + finaltable) sparkSession.sql("use Database_Name") sparkSession.sql("SET hive.exec.dynamic.partition = true") sparkSession.sql("SET hive.exec.dynamic.partition.mode = nonstrict ") sparkSession.sql("SET hive.exec.max.dynamic.partitions.pernode = 400") sparkSession.sql("create table " + dbName +"." + finaltable + "(EMP_ID string,EMP_Name string,EMP_Address string,EMP_Salary bigint) PARTITIONED BY (EMP_DEP STRING)") //Table is created now insert the DataFrame in append Mode df.write.mode(SaveMode.Append).insertInto(empDB + "." + finaltable) }
C'est ce qui fonctionne pour moi. J'ai paramétré ces paramètres, puis j'ai placé les données dans des tables partitionnées.
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
sqlContext.setConf("hive.exec.dynamic.partition", "true")
sqlContext.setConf("hive.exec.dynamic.partition.mode",
"nonstrict")
cela a fonctionné pour moi en utilisant python et spark 2.1.0.
Pas sûr si c'est la meilleure façon de le faire, mais il fonctionne...
# WRITE DATA INTO A HIVE TABLE
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.master("local[*]") \
.config("hive.exec.dynamic.partition", "true") \
.config("hive.exec.dynamic.partition.mode", "nonstrict") \
.enableHiveSupport() \
.getOrCreate()
### CREATE HIVE TABLE (with one row)
spark.sql("""
CREATE TABLE IF NOT EXISTS hive_df (col1 INT, col2 STRING, partition_bin INT)
USING HIVE OPTIONS(fileFormat 'PARQUET')
PARTITIONED BY (partition_bin)
LOCATION 'hive_df'
""")
spark.sql("""
INSERT INTO hive_df PARTITION (partition_bin = 0)
VALUES (0, 'init_record')
""")
###
### CREATE NON HIVE TABLE (with one row)
spark.sql("""
CREATE TABLE IF NOT EXISTS non_hive_df (col1 INT, col2 STRING, partition_bin INT)
USING PARQUET
PARTITIONED BY (partition_bin)
LOCATION 'non_hive_df'
""")
spark.sql("""
INSERT INTO non_hive_df PARTITION (partition_bin = 0)
VALUES (0, 'init_record')
""")
###
### ATTEMPT DYNAMIC OVERWRITE WITH EACH TABLE
spark.sql("""
INSERT OVERWRITE TABLE hive_df PARTITION (partition_bin)
VALUES (0, 'new_record', 1)
""")
spark.sql("""
INSERT OVERWRITE TABLE non_hive_df PARTITION (partition_bin)
VALUES (0, 'new_record', 1)
""")
spark.sql("SELECT * FROM hive_df").show() # 2 row dynamic overwrite
spark.sql("SELECT * FROM non_hive_df").show() # 1 row full table overwrite