Utiliser la table de copie binaire à partir de psycopg2

j'ai des dizaines de millions de lignes à transférer des fichiers de tableaux multidimensionnels dans une base de données PostgreSQL. Mes outils sont Python et psycopg2. La manière la plus efficace de regrouper les données instert est d'utiliser copy_from . Cependant, mes données sont principalement des nombres flottants de 32 bits (réel ou float4), donc je préfère ne pas convertir de réel → texte → réel. Voici un exemple de base de données DDL:

CREATE TABLE num_data
(
  id serial PRIMARY KEY NOT NULL,
  node integer NOT NULL,
  ts smallint NOT NULL,
  val1 real,
  val2 double precision
);

voici où je suis avec Python utilisant des cordes (texte):

# Just one row of data
num_row = [23253, 342, -15.336734, 2494627.949375]

import psycopg2
# Python3:
from io import StringIO
# Python2, use: from cStringIO import StringIO

conn = psycopg2.connect("dbname=mydb user=postgres")
curs = conn.cursor()

# Convert floating point numbers to text, write to COPY input
cpy = StringIO()
cpy.write('t'.join([repr(x) for x in num_row]) + 'n')

# Insert data; database converts text back to floating point numbers
cpy.seek(0)
curs.copy_from(cpy, 'num_data', columns=('node', 'ts', 'val1', 'val2'))
conn.commit()

y a-t-il un équivalent qui pourrait fonctionner en mode binaire? I. e., garder des nombres en virgule flottante binaire? Non seulement cela préserverait la précision de la pointe flottante, mais cela pourrait être plus rapide.

(Note: pour voir la même précision que l'exemple, utilisez SET extra_float_digits='2' )

25
demandé sur Mike T 2011-11-16 02:21:19

2 réponses

Voici l'équivalent binaire de la copie pour Python 3:

from io import BytesIO
from struct import pack
import psycopg2

# Two rows of data; "id" is not in the upstream data source
# Columns: node, ts, val1, val2
data = [(23253, 342, -15.336734, 2494627.949375),
        (23256, 348, 43.23524, 2494827.949375)]

conn = psycopg2.connect("dbname=mydb user=postgres")
curs = conn.cursor()

# Determine starting value for sequence
curs.execute("SELECT nextval('num_data_id_seq')")
id_seq = curs.fetchone()[0]

# Make a binary file object for COPY FROM
cpy = BytesIO()
# 11-byte signature, no flags, no header extension
cpy.write(pack('!11sii', b'PGCOPY\n7\r\n"151900920"', 0, 0))

# Columns: id, node, ts, val1, val2
# Zip: (column position, format, size)
row_format = list(zip(range(-1, 4),
                      ('i', 'i', 'h', 'f', 'd'),
                      ( 4,   4,   2,   4,   8 )))
for row in data:
    # Number of columns/fields (always 5)
    cpy.write(pack('!h', 5))
    for col, fmt, size in row_format:
        value = (id_seq if col == -1 else row[col])
        cpy.write(pack('!i' + fmt, size, value))
    id_seq += 1  # manually increment sequence outside of database

# File trailer
cpy.write(pack('!h', -1))

# Copy data to database
cpy.seek(0)
curs.copy_expert("COPY num_data FROM STDIN WITH BINARY", cpy)

# Update sequence on database
curs.execute("SELECT setval('num_data_id_seq', %s, false)", (id_seq,))
conn.commit()

mise à Jour

j'ai réécrit l'approche ci-dessus pour écrire les fichiers pour copie. Mes données en Python sont en tableaux NumPy, donc il est logique de les utiliser. Voici un exemple data avec des lignes de 1m, 7 colonnes:

import psycopg2
import numpy as np
from struct import pack
from io import BytesIO
from datetime import datetime

conn = psycopg2.connect("dbname=mydb user=postgres")
curs = conn.cursor()

# NumPy record array
shape = (7, 2000, 500)
print('Generating data with %i rows, %i columns' % (shape[1]*shape[2], shape[0]))

dtype = ([('id', 'i4'), ('node', 'i4'), ('ts', 'i2')] +
         [('s' + str(x), 'f4') for x in range(shape[0])])
data = np.empty(shape[1]*shape[2], dtype)
data['id'] = np.arange(shape[1]*shape[2]) + 1
data['node'] = np.tile(np.arange(shape[1]) + 1, shape[2])
data['ts'] = np.repeat(np.arange(shape[2]) + 1, shape[1])
data['s0'] = np.random.rand(shape[1]*shape[2]) * 100
prv = 's0'
for nxt in data.dtype.names[4:]:
    data[nxt] = data[prv] + np.random.rand(shape[1]*shape[2]) * 10
    prv = nxt

sur ma base de données, j'ai deux tables qui ressemblent à:

CREATE TABLE num_data_binary
(
  id integer PRIMARY KEY,
  node integer NOT NULL,
  ts smallint NOT NULL,
  s0 real,
  s1 real,
  s2 real,
  s3 real,
  s4 real,
  s5 real,
  s6 real
) WITH (OIDS=FALSE);

et un autre tableau similaire appelé num_data_text .

voici quelques fonctions d'aide simples pour préparer les données pour la copie (à la fois le texte et les formats binaires) en utilisant l'information dans le tableau D'enregistrement de NumPy:

def prepare_text(dat):
    cpy = BytesIO()
    for row in dat:
        cpy.write('\t'.join([repr(x) for x in row]) + '\n')
    return(cpy)

def prepare_binary(dat):
    pgcopy_dtype = [('num_fields','>i2')]
    for field, dtype in dat.dtype.descr:
        pgcopy_dtype += [(field + '_length', '>i4'),
                         (field, dtype.replace('<', '>'))]
    pgcopy = np.empty(dat.shape, pgcopy_dtype)
    pgcopy['num_fields'] = len(dat.dtype)
    for i in range(len(dat.dtype)):
        field = dat.dtype.names[i]
        pgcopy[field + '_length'] = dat.dtype[i].alignment
        pgcopy[field] = dat[field]
    cpy = BytesIO()
    cpy.write(pack('!11sii', b'PGCOPY\n7\r\n"151930920"', 0, 0))
    cpy.write(pgcopy.tostring())  # all rows
    cpy.write(pack('!h', -1))  # file trailer
    return(cpy)

Voici comment j'utilise les fonctions d'aide pour comparer les deux méthodes de format de copie:

def time_pgcopy(dat, table, binary):
    print('Processing copy object for ' + table)
    tstart = datetime.now()
    if binary:
        cpy = prepare_binary(dat)
    else:  # text
        cpy = prepare_text(dat)
    tendw = datetime.now()
    print('Copy object prepared in ' + str(tendw - tstart) + '; ' +
          str(cpy.tell()) + ' bytes; transfering to database')
    cpy.seek(0)
    if binary:
        curs.copy_expert('COPY ' + table + ' FROM STDIN WITH BINARY', cpy)
    else:  # text
        curs.copy_from(cpy, table)
    conn.commit()
    tend = datetime.now()
    print('Database copy time: ' + str(tend - tendw))
    print('        Total time: ' + str(tend - tstart))
    return

time_pgcopy(data, 'num_data_text', binary=False)
time_pgcopy(data, 'num_data_binary', binary=True)

Voici la sortie des deux dernières commandes time_pgcopy :

Processing copy object for num_data_text
Copy object prepared in 0:01:15.288695; 84355016 bytes; transfering to database
Database copy time: 0:00:37.929166
        Total time: 0:01:53.217861
Processing copy object for num_data_binary
Copy object prepared in 0:00:01.296143; 80000021 bytes; transfering to database
Database copy time: 0:00:23.325952
        Total time: 0:00:24.622095

ainsi les deux étapes NumPy → file et file → database sont beaucoup plus rapides avec l'approche binaire. La différence évidente est la façon dont Python prépare le fichier de copie, qui est vraiment lent pour le texte. D'une manière générale, le format binaire se charge dans la base de données dans les 2/3 du temps comme format texte pour ce schéma.

enfin, j'ai comparé les valeurs dans les deux tableaux de la base de données pour voir si les nombres étaient différents. Environ 1,46% des lignes ont différentes valeurs pour la colonne s0 , et cette fraction augmente à 6,17% pour s6 (probablement liée à la méthode aléatoire que j'ai utilisé). Les différences absolues non nulles entre les valeurs de flotteurs de 70M 32 bits varient entre 9.3132257 e-010 et 7.6293945 e-006. Ces petites différences entre les méthodes de chargement Texte et binaire sont dues à la perte de précision des conversions float → text → float requises pour la méthode de format texte.

31
répondu Mike T 2013-05-16 23:00:51

ici est ma version. Basé sur la version de Mike.

Sa très ad-hoc, mais il y a deux avantages:

  • s'Attendre générateur et agit comme un flux par surcharge readline
  • exemple comment écrire en format binaire hstore .
1
répondu enomad 2016-07-20 22:06:38