File D'attente en tant que table SQL avec plusieurs consommateurs (PostgreSQL)

j'ai un typique producteur-consommateur problème:

les demandes de plusieurs producteurs écrivent des demandes d'emploi à une table de travail sur une base de données PostgreSQL.

les demandes d'emploi ont un champ d'état qui commence contient la file D'attente sur la création.

Il y a multiple consumer applications qui sont avertis en règle générale, quand un producteur insère un nouveau record:

CREATE OR REPLACE RULE "jobrecord.added" AS
  ON INSERT TO jobrecord DO 
  NOTIFY "jobrecordAdded";

ils vont essayer pour réserver un nouveau record en mettant son état à réservé. Bien sûr, seulement sur le consommateur devrait réussir. Tous les autres consommateurs ne devraient pas pouvoir réserver le même enregistrement. Ils devraient plutôt réserver d'autres enregistrements avec state=QUEUED.

exemple: certains producteurs ont ajouté les enregistrements suivants au tableau jobrecord :

id state  owner  payload
------------------------
1 QUEUED null   <data>
2 QUEUED null   <data>
3 QUEUED null   <data>
4 QUEUED null   <data>

maintenant, deux consommateurs Un , B voulez traiter ils. Ils se mettent à courir en même temps. On devrait réserver id 1, l'autre devrait réserver id 2, puis le premier qui termine devrait réserver id 3 et ainsi de suite..

dans un monde pur multithreaded, j'utiliserais un mutex pour contrôler l'accès à la file d'attente de travail, mais les consommateurs sont des processus différents qui peuvent fonctionner sur des machines différentes. Ils n'ont accès qu'à la même base de données, donc toute synchronisation doit se faire à travers la base de données.

j'ai lu beaucoup de documentation sur l'accès et le verrouillage simultanés dans PostgreSQL, par exemple http://www.postgresql.org/docs/9.0/interactive/explicit-locking.html Sélectionnez déverrouillé ligne dans Postgresql PostgreSQL et verrouillage

de ces sujets, j'ai appris, que la déclaration SQL suivante devrait faire ce dont j'ai besoin:

UPDATE jobrecord
  SET owner= :owner, state = :reserved 
  WHERE id = ( 
     SELECT id from jobrecord WHERE state = :queued 
        ORDER BY id  LIMIT 1 
     ) 
  RETURNING id;  // will only return an id when they reserved it successfully

malheureusement, quand je l'exécute dans plusieurs processus de consommation, dans environ 50% des cas, ils réservent toujours le même enregistrement, tant en le traitant qu'en réécrivant les changements de l'autre.

Qu'est-ce que je rate? Comment dois-je écrire la déclaration SQL pour que plusieurs consommateurs ne réservent pas le même enregistrement?

30
demandé sur Community 2011-06-28 17:48:27

7 réponses

lire mon message ici:

cohérence dans postgresql avec verrouillage et sélectionner pour mise à jour

si vous utilisez la table de transaction et de verrouillage vous n'aurez aucun problème.

4
répondu jordani 2017-05-23 12:26:15

j'utilise postgres pour une file D'attente FIFO aussi. À l'origine, J'ai utilisé ACCESS EXCLUSIVE, ce qui donne des résultats corrects en haute concurrence, mais a l'effet malheureux d'être mutuellement exclusive avec pg_dump, qui acquiert un verrou de part D'accès pendant son exécution. Cela provoque le verrouillage de ma fonction next() pendant une très longue période (la durée de pg_dump). Ce n'était pas acceptable puisque nous sommes un magasin 24x7 et les clients n'ont pas aimé le temps mort sur la queue au milieu de la nuit.

je me suis dit qu'il devait y avoir un verrou moins restrictif qui serait tout de même simultané-sûr et non verrouillé pendant que pg_dump est en cours d'exécution. Ma recherche m'a conduit à cette SORTE de post.

Puis j'ai fait quelques recherches.

les modes suivants sont suffisants pour une fonction FIFO file NEXT() qui va mettre à jour l'état d'une tâche de file à tournant sans aucune défaillance de concurrence, et aussi ne pas bloquer contre pg_dump:

SHARE UPDATE EXCLUSIVE
SHARE ROW EXCLUSIVE
EXCLUSIVE

Requête:

begin;
lock table tx_test_queue in exclusive mode;
update 
    tx_test_queue
set 
    status='running'
where
    job_id in (
        select
            job_id
        from
            tx_test_queue
        where
            status='queued'
        order by 
            job_id asc
        limit 1
    )
returning job_id;
commit;

résultat ressemble à:

UPDATE 1
 job_id
--------
     98
(1 row)

voici un script shell qui teste tous les différents modes de verrouillage à haute simultanéité (30).

#!/bin/bash
# RESULTS, feel free to repro yourself
#
# noLock                    FAIL
# accessShare               FAIL
# rowShare                  FAIL
# rowExclusive              FAIL
# shareUpdateExclusive      SUCCESS
# share                     FAIL+DEADLOCKS
# shareRowExclusive         SUCCESS
# exclusive                 SUCCESS
# accessExclusive           SUCCESS, but LOCKS against pg_dump

#config
strategy="exclusive"

db=postgres
dbuser=postgres
queuecount=100
concurrency=30

# code
psql84 -t -U $dbuser $db -c "create table tx_test_queue (job_id serial, status text);"
# empty queue
psql84 -t -U $dbuser $db -c "truncate tx_test_queue;";
echo "Simulating 10 second pg_dump with ACCESS SHARE"
psql84 -t -U $dbuser $db -c "lock table tx_test_queue in ACCESS SHARE mode; select pg_sleep(10); select 'pg_dump finished...'" &

echo "Starting workers..."
# queue $queuecount items
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -q -U $dbuser $db -c "insert into tx_test_queue (status) values ('queued');"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
# process $queuecount w/concurrency of $concurrency
case $strategy in
    "noLock")               strategySql="update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "accessShare")          strategySql="lock table tx_test_queue in ACCESS SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "rowShare")             strategySql="lock table tx_test_queue in ROW SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "rowExclusive")         strategySql="lock table tx_test_queue in ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "shareUpdateExclusive") strategySql="lock table tx_test_queue in SHARE UPDATE EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "share")                strategySql="lock table tx_test_queue in SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "shareRowExclusive")    strategySql="lock table tx_test_queue in SHARE ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "exclusive")            strategySql="lock table tx_test_queue in EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "accessExclusive")      strategySql="lock table tx_test_queue in ACCESS EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    *) echo "Unknown strategy $strategy";;
esac
echo $strategySql
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -U $dbuser $db -c "$strategySql"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
psql84 -U $dbuser $db -c "select count(distinct(status)) as should_output_100 from tx_test_queue;"
psql84 -t -U $dbuser $db -c "drop table tx_test_queue;";
Le Code

est ici aussi si vous voulez éditer: https://gist.github.com/1083936

je mets à jour mon application pour utiliser le mode exclusif car il est mode le plus restrictif qu'un) est correct et b) n'entre pas en conflit avec pg_dump. J'ai choisi le plus restrictif car il semble le moins risqué en termes de changer l'application D'accès EXCLUSIVE sans être un uber-expert en postgres de verrouillage.

je me sens assez à l'aise avec mon banc d'essai et avec les idées générales derrière la réponse. J'espère que partager cela aidera à résoudre ce problème pour d'autres.

32
répondu apinstein 2011-07-15 03:17:43

pas besoin de faire un verrouillage de table entier pour cela :\.

une serrure de rangée créée avec for update fonctionne très bien.

voir https://gist.github.com/mackross/a49b72ad8d24f7cefc32 pour le changement que j'ai fait à la réponse d'apinstein et vérifié qu'elle fonctionne toujours.

le code Final est

update 
    tx_test_queue
set 
    status='running'
where
    job_id in (
        select
            job_id
        from
            tx_test_queue
        where
            status='queued'
        order by 
            job_id asc
        limit 1 for update
    )
returning job_id;
14
répondu mackross 2015-05-19 01:36:56

et juste select?

SELECT * FROM table WHERE status = 'QUEUED' LIMIT 10 FOR UPDATE SKIP LOCKED;

https://www.postgresql.org/docs/9.5/static/sql-select.html#SQL-FOR-UPDATE-SHARE

4
répondu Vladimir Filipchenko 2018-01-08 09:28:48

vous pourriez vouloir regarder comment queue_classic le fait. https://github.com/ryandotsmith/queue_classic

Le code est assez court et facile à comprendre.

2
répondu Joe Van Dyk 2012-10-01 18:34:44

Check out PgQ au lieu de réinventer la roue.

-1
répondu Sean 2011-06-28 18:17:14

ok, voici la solution qui fonctionne pour moi, basée sur le lien de jordani. Comme certains de mes problèmes étaient dans la façon dont Qt-SQL fonctionne, j'ai inclus le code Qt:

QSqlDatabase db = GetDatabase();
db.transaction();
QSqlQuery lockQuery(db);
bool lockResult = lockQuery.exec("LOCK TABLE serverjobrecord IN ACCESS EXCLUSIVE MODE; ");
QSqlQuery query(db);
query.prepare(    
"UPDATE jobrecord "
"  SET \"owner\"= :owner, state = :reserved "
"  WHERE id = ( "
"    SELECT id from jobrecord WHERE state = :queued ORDER BY id LIMIT 1 "
"  ) RETURNING id;"
);
query.bindValue(":owner", pid);
query.bindValue(":reserved", JobRESERVED);
query.bindValue(":queued", JobQUEUED); 
bool result = query.exec();

Pour vérifier, si plusieurs consommateurs processus même travail, j'ai ajouté une règle et d'un journal-table:

CREATE TABLE serverjobrecord_log
(
  serverjobrecord_id integer,
  oldowner text,
  newowner text
) WITH ( OIDS=FALSE );


CREATE OR REPLACE RULE ownerrule AS ON UPDATE TO jobrecord
WHERE old.owner IS NOT NULL AND new.state = 1 
DO INSERT INTO jobrecord_log     (id, oldowner, newowner) 
    VALUES (new.id, old.owner, new.owner);

sans la mention LOCK TABLE serverjobrecord IN ACCESS EXCLUSIVE MODE; , la table de logarithme se remplit occasionnellement d'entrées, lorsque l'un des consommateurs a écrasé les valeurs d'un autre, mais en utilisant la déclaration de verrouillage, la table de journal reste vide: -)

-1
répondu code_talker 2011-06-30 08:04:55