Comment convertir un fichier csv en parquet
7 réponses
Ici est un exemple de code qui le fait dans les deux sens.
Vous pouvez utiliser Exercice Apache, comme décrit dans convertissez un fichier CSV en Parquet Apache avec Drill.
En bref:
Démarrer Apache Drill:
$ cd /opt/drill/bin $ sqlline -u jdbc:drill:zk=local
Créer le Parquet de fichier:
-- Set default table format to parquet ALTER SESSION SET `store.format`='parquet'; -- Create a parquet table containing all data from the CSV table CREATE TABLE dfs.tmp.`/stats/airport_data/` AS SELECT CAST(SUBSTR(columns[0],1,4) AS INT) `YEAR`, CAST(SUBSTR(columns[0],5,2) AS INT) `MONTH`, columns[1] as `AIRLINE`, columns[2] as `IATA_CODE`, columns[3] as `AIRLINE_2`, columns[4] as `IATA_CODE_2`, columns[5] as `GEO_SUMMARY`, columns[6] as `GEO_REGION`, columns[7] as `ACTIVITY_CODE`, columns[8] as `PRICE_CODE`, columns[9] as `TERMINAL`, columns[10] as `BOARDING_AREA`, CAST(columns[11] AS DOUBLE) as `PASSENGER_COUNT` FROM dfs.`/opendata/Passenger/SFO_Passenger_Data/*.csv`;
Essayez de sélectionner les données de la nouvelle Parquet fichier:
-- Select data from parquet table SELECT * FROM dfs.tmp.`/stats/airport_data/*`
Vous pouvez changer le dfs.tmp
localisation en allant à http://localhost:8047/storage/dfs
(source: CSV et Parquet).
Le code suivant est un exemple d'utilisation de spark2.0. La lecture est beaucoup plus rapide que l'option inferSchema. Spark 2.0 convertir en fichier de parquet dans beaucoup plus efficace que spark1.6.
import org.apache.spark.sql.types._
var df = StructType(Array(StructField("timestamp", StringType, true),StructField("site", StringType, true),StructField("requests", LongType, true) ))
df = spark.read
.schema(df)
.option("header", "true")
.option("delimiter", "\t")
.csv("/user/hduser/wikipedia/pageviews-by-second-tsv")
df.write.parquet("/user/hduser/wikipedia/pageviews-by-second-parquet")
j'ai déjà posté réponse sur la façon d'utiliser Apache Drill. Cependant, si vous êtes familier avec Python, vous pouvez maintenant le faire en utilisant Pandas et PyArrow!
installer des dépendances
en utilisant pip
:
pip install pandas pyarrow
ou en utilisant conda
:
conda install pandas pyarrow -c conda-forge
convertir CSV en Parquet en morceaux
# csv_to_parquet.py
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
csv_file = '/path/to/my.tsv'
chunksize = 100_000
csv_stream = pd.read_csv(csv_file, sep='\t', chunksize=chunksize, low_memory=False)
for i, chunk in enumerate(csv_stream):
print("Chunk", i)
if i == 0:
# Guess the schema of the CSV file from the first chunk
parquet_schema = pa.Table.from_pandas(df=chunk).schema
# Open a Parquet file for writing
parquet_writer = pq.ParquetWriter(parquet_file, parquet_schema, compression='snappy')
# Write CSV chunk to the parquet file
table = pa.Table.from_pandas(chunk, schema=parquet_schema)
parquet_writer.write_table(table)
parquet_writer.close()
Je n'ai pas comparé ce code avec la version de foret Apache, mais dans mon l'expérience c'est beaucoup rapide, convertissant des dizaines de milliers de lignes par seconde (cela dépend du fichier CSV bien sûr!).
1) vous pouvez créer une table ruche externe
create external table emp(name string,job_title string,department string,salary_per_year int)
row format delimited
fields terminated by ','
location '.. hdfs location of csv file '
2) Une autre table Ruche qui stockera le fichier de parquet
create external table emp_par(name string,job_title string,department string,salary_per_year int)
row format delimited
stored as PARQUET
location 'hdfs location were you want the save parquet file'
insérer les données du tableau 1 dans le tableau 2:
insert overwrite table emp_par select * from emp
lire les fichiers csv comme Dataframe dans Apache Sparkpackage spark-csv. après avoir chargé des données dans Dataframe, sauvegardez dataframe dans parquetfile.
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.option("mode", "DROPMALFORMED")
.load("/home/myuser/data/log/*.csv")
df.saveAsParquetFile("/home/myuser/data.parquet")
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import sys
sc = SparkContext(appName="CSV2Parquet")
sqlContext = SQLContext(sc)
schema = StructType([
StructField("col1", StringType(), True),
StructField("col2", StringType(), True),
StructField("col3", StringType(), True),
StructField("col4", StringType(), True),
StructField("col5", StringType(), True)])
rdd = sc.textFile('/input.csv').map(lambda line: line.split(","))
df = sqlContext.createDataFrame(rdd, schema)
df.write.parquet('/output.parquet')