File D'attente en tant que table SQL avec plusieurs consommateurs (PostgreSQL)
j'ai un typique producteur-consommateur problème:
les demandes de plusieurs producteurs écrivent des demandes d'emploi à une table de travail sur une base de données PostgreSQL.
les demandes d'emploi ont un champ d'état qui commence contient la file D'attente sur la création.
Il y a multiple consumer applications qui sont avertis en règle générale, quand un producteur insère un nouveau record:
CREATE OR REPLACE RULE "jobrecord.added" AS
ON INSERT TO jobrecord DO
NOTIFY "jobrecordAdded";
ils vont essayer pour réserver un nouveau record en mettant son état à réservé. Bien sûr, seulement sur le consommateur devrait réussir. Tous les autres consommateurs ne devraient pas pouvoir réserver le même enregistrement. Ils devraient plutôt réserver d'autres enregistrements avec state=QUEUED.
exemple: certains producteurs ont ajouté les enregistrements suivants au tableau jobrecord :
id state owner payload
------------------------
1 QUEUED null <data>
2 QUEUED null <data>
3 QUEUED null <data>
4 QUEUED null <data>
maintenant, deux consommateurs Un , B voulez traiter ils. Ils se mettent à courir en même temps. On devrait réserver id 1, l'autre devrait réserver id 2, puis le premier qui termine devrait réserver id 3 et ainsi de suite..
dans un monde pur multithreaded, j'utiliserais un mutex pour contrôler l'accès à la file d'attente de travail, mais les consommateurs sont des processus différents qui peuvent fonctionner sur des machines différentes. Ils n'ont accès qu'à la même base de données, donc toute synchronisation doit se faire à travers la base de données.
j'ai lu beaucoup de documentation sur l'accès et le verrouillage simultanés dans PostgreSQL, par exemple http://www.postgresql.org/docs/9.0/interactive/explicit-locking.html Sélectionnez déverrouillé ligne dans Postgresql PostgreSQL et verrouillage
de ces sujets, j'ai appris, que la déclaration SQL suivante devrait faire ce dont j'ai besoin:
UPDATE jobrecord
SET owner= :owner, state = :reserved
WHERE id = (
SELECT id from jobrecord WHERE state = :queued
ORDER BY id LIMIT 1
)
RETURNING id; // will only return an id when they reserved it successfully
malheureusement, quand je l'exécute dans plusieurs processus de consommation, dans environ 50% des cas, ils réservent toujours le même enregistrement, tant en le traitant qu'en réécrivant les changements de l'autre.
Qu'est-ce que je rate? Comment dois-je écrire la déclaration SQL pour que plusieurs consommateurs ne réservent pas le même enregistrement?
7 réponses
lire mon message ici:
cohérence dans postgresql avec verrouillage et sélectionner pour mise à jour
si vous utilisez la table de transaction et de verrouillage vous n'aurez aucun problème.
j'utilise postgres pour une file D'attente FIFO aussi. À l'origine, J'ai utilisé ACCESS EXCLUSIVE, ce qui donne des résultats corrects en haute concurrence, mais a l'effet malheureux d'être mutuellement exclusive avec pg_dump, qui acquiert un verrou de part D'accès pendant son exécution. Cela provoque le verrouillage de ma fonction next() pendant une très longue période (la durée de pg_dump). Ce n'était pas acceptable puisque nous sommes un magasin 24x7 et les clients n'ont pas aimé le temps mort sur la queue au milieu de la nuit.
je me suis dit qu'il devait y avoir un verrou moins restrictif qui serait tout de même simultané-sûr et non verrouillé pendant que pg_dump est en cours d'exécution. Ma recherche m'a conduit à cette SORTE de post.
Puis j'ai fait quelques recherches.
les modes suivants sont suffisants pour une fonction FIFO file NEXT() qui va mettre à jour l'état d'une tâche de file à tournant sans aucune défaillance de concurrence, et aussi ne pas bloquer contre pg_dump:
SHARE UPDATE EXCLUSIVE
SHARE ROW EXCLUSIVE
EXCLUSIVE
Requête:
begin;
lock table tx_test_queue in exclusive mode;
update
tx_test_queue
set
status='running'
where
job_id in (
select
job_id
from
tx_test_queue
where
status='queued'
order by
job_id asc
limit 1
)
returning job_id;
commit;
résultat ressemble à:
UPDATE 1
job_id
--------
98
(1 row)
voici un script shell qui teste tous les différents modes de verrouillage à haute simultanéité (30).
#!/bin/bash
# RESULTS, feel free to repro yourself
#
# noLock FAIL
# accessShare FAIL
# rowShare FAIL
# rowExclusive FAIL
# shareUpdateExclusive SUCCESS
# share FAIL+DEADLOCKS
# shareRowExclusive SUCCESS
# exclusive SUCCESS
# accessExclusive SUCCESS, but LOCKS against pg_dump
#config
strategy="exclusive"
db=postgres
dbuser=postgres
queuecount=100
concurrency=30
# code
psql84 -t -U $dbuser $db -c "create table tx_test_queue (job_id serial, status text);"
# empty queue
psql84 -t -U $dbuser $db -c "truncate tx_test_queue;";
echo "Simulating 10 second pg_dump with ACCESS SHARE"
psql84 -t -U $dbuser $db -c "lock table tx_test_queue in ACCESS SHARE mode; select pg_sleep(10); select 'pg_dump finished...'" &
echo "Starting workers..."
# queue $queuecount items
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -q -U $dbuser $db -c "insert into tx_test_queue (status) values ('queued');"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
# process $queuecount w/concurrency of $concurrency
case $strategy in
"noLock") strategySql="update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"accessShare") strategySql="lock table tx_test_queue in ACCESS SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"rowShare") strategySql="lock table tx_test_queue in ROW SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"rowExclusive") strategySql="lock table tx_test_queue in ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"shareUpdateExclusive") strategySql="lock table tx_test_queue in SHARE UPDATE EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"share") strategySql="lock table tx_test_queue in SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"shareRowExclusive") strategySql="lock table tx_test_queue in SHARE ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"exclusive") strategySql="lock table tx_test_queue in EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
"accessExclusive") strategySql="lock table tx_test_queue in ACCESS EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
*) echo "Unknown strategy $strategy";;
esac
echo $strategySql
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -U $dbuser $db -c "$strategySql"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
psql84 -U $dbuser $db -c "select count(distinct(status)) as should_output_100 from tx_test_queue;"
psql84 -t -U $dbuser $db -c "drop table tx_test_queue;";
Le Code est ici aussi si vous voulez éditer: https://gist.github.com/1083936
je mets à jour mon application pour utiliser le mode exclusif car il est mode le plus restrictif qu'un) est correct et b) n'entre pas en conflit avec pg_dump. J'ai choisi le plus restrictif car il semble le moins risqué en termes de changer l'application D'accès EXCLUSIVE sans être un uber-expert en postgres de verrouillage.
je me sens assez à l'aise avec mon banc d'essai et avec les idées générales derrière la réponse. J'espère que partager cela aidera à résoudre ce problème pour d'autres.
pas besoin de faire un verrouillage de table entier pour cela :\.
une serrure de rangée créée avec for update
fonctionne très bien.
voir https://gist.github.com/mackross/a49b72ad8d24f7cefc32 pour le changement que j'ai fait à la réponse d'apinstein et vérifié qu'elle fonctionne toujours.
le code Final est
update
tx_test_queue
set
status='running'
where
job_id in (
select
job_id
from
tx_test_queue
where
status='queued'
order by
job_id asc
limit 1 for update
)
returning job_id;
et juste select?
SELECT * FROM table WHERE status = 'QUEUED' LIMIT 10 FOR UPDATE SKIP LOCKED;
https://www.postgresql.org/docs/9.5/static/sql-select.html#SQL-FOR-UPDATE-SHARE
vous pourriez vouloir regarder comment queue_classic le fait. https://github.com/ryandotsmith/queue_classic
Le code est assez court et facile à comprendre.
ok, voici la solution qui fonctionne pour moi, basée sur le lien de jordani. Comme certains de mes problèmes étaient dans la façon dont Qt-SQL fonctionne, j'ai inclus le code Qt:
QSqlDatabase db = GetDatabase();
db.transaction();
QSqlQuery lockQuery(db);
bool lockResult = lockQuery.exec("LOCK TABLE serverjobrecord IN ACCESS EXCLUSIVE MODE; ");
QSqlQuery query(db);
query.prepare(
"UPDATE jobrecord "
" SET \"owner\"= :owner, state = :reserved "
" WHERE id = ( "
" SELECT id from jobrecord WHERE state = :queued ORDER BY id LIMIT 1 "
" ) RETURNING id;"
);
query.bindValue(":owner", pid);
query.bindValue(":reserved", JobRESERVED);
query.bindValue(":queued", JobQUEUED);
bool result = query.exec();
Pour vérifier, si plusieurs consommateurs processus même travail, j'ai ajouté une règle et d'un journal-table:
CREATE TABLE serverjobrecord_log
(
serverjobrecord_id integer,
oldowner text,
newowner text
) WITH ( OIDS=FALSE );
CREATE OR REPLACE RULE ownerrule AS ON UPDATE TO jobrecord
WHERE old.owner IS NOT NULL AND new.state = 1
DO INSERT INTO jobrecord_log (id, oldowner, newowner)
VALUES (new.id, old.owner, new.owner);
sans la mention LOCK TABLE serverjobrecord IN ACCESS EXCLUSIVE MODE;
, la table de logarithme se remplit occasionnellement d'entrées, lorsque l'un des consommateurs a écrasé les valeurs d'un autre, mais en utilisant la déclaration de verrouillage, la table de journal reste vide: -)