Pratique exemplaire pour sonder une file D'attente AWS SQS et supprimer les messages reçus de la file d'attente?

j'ai une file D'attente SQS qui est constamment peuplée par un consommateur de données et j'essaye MAINTENANT de créer le service qui va extraire ces données de SQS en utilisant le boto de Python.

la façon dont je l'ai conçu, c'est que je vais avoir 10-20 threads tous essayer de lire les messages à partir de la file D'attente SQS et puis faire ce qu'ils ont à faire sur les données (logique d'affaires), avant de retourner à la file d'attente pour obtenir le prochain lot de données une fois qu'ils sont faits. S'il n'y a pas de données, ils attendront les données sont disponibles.

j'ai deux domaines, je ne suis pas sûr au sujet de cette conception

  1. est-ce que c'est une question d'appeler receive_message() avec une longue valeur time_out et si rien n'est retourné dans les 20 secondes (maximum autorisé) alors réessayez? Ou y a-t-il une méthode de blocage qui ne retourne que lorsque les données sont disponibles?
  2. j'ai remarqué que lorsque je reçois un message, il n'est pas supprimé de la file d'attente, je dois recevoir un message et envoyer une autre demande après la réception pour les supprimer de la file d'attente? semble un peu exagéré.

Merci

12
demandé sur High6 2015-07-09 18:31:57

3 réponses

la capacité de sondage de longue durée du receive_message() la méthode est la façon la plus efficace de sonder les LP. Si cela revient sans aucun message, je recommande un court délai avant de revenir, surtout si vous avez plusieurs lecteurs. Vous pouvez même vouloir faire un délai incrémental de sorte que chaque lecture vide suivante attend un peu plus, juste pour ne pas finir par se faire étrangler par AWS.

Et oui, vous n'avez qu'à supprimer le message après l'avoir lu ou il réapparaît dans la file d'attente. Cela peut en fait être très utile dans le cas d'un travailleur lisant un message et échouant avant qu'il ne puisse traiter pleinement le message. Dans ce cas, il serait relu et relu par un autre travailleur. Vous voulez également vous assurer que le délai d'invisibilité des messages est réglé pour être assez long le travailleur a assez de temps pour traiter le message avant qu'il réapparaisse automatiquement sur la file d'attente. Si nécessaire, vos travailleurs peuvent ajuster le délai pendant qu'ils traitent si cela prend plus de temps que devrait.

13
répondu garnaat 2015-07-09 16:07:33

si vous voulez un moyen simple de configurer un écouteur qui inclut la suppression automatique des messages lorsqu'ils sont traités, et la poussée automatique des exceptions vers une file d'attente spécifiée, vous pouvez utiliser le pySqsListener paquet.

Vous pouvez définir un écouteur comme ceci:

from sqs_listener import SqsListener

class MyListener(SqsListener):
    def handle_message(self, body, attributes, messages_attributes):
        run_my_function(body['param1'], body['param2']

listener = MyListener('my-message-queue', 'my-error-queue')
listener.listen()

pour le moment, le paquet utilise des sondages courts, mais j'envisage de rendre le type de sondage configurable.

Disclaimer: je suis l'auteur de cette paquet.

4
répondu ygesher 2017-02-15 16:53:40

une autre option consiste à configurer une application worker en utilisant AWS Beanstalk comme décrit dans ce billet de blog.

au lieu d'utiliser boto3, votre application flask reçoit le message comme un objet json dans un message HTTP. Le chemin HTTP et le type de message en cours de paramétrage sont configurables dans L'onglet AWS Elastic Beanstalk Configuration:

enter image description here

le haricot élastique AWS a l'avantage supplémentaire de pouvoir mettez à l'échelle dynamique le nombre de travailleurs en fonction de la taille de votre file D'attente LP, ainsi que ses avantages en matière de gestion du déploiement.

est un exemple d'application que j'ai trouvé utile comme un modèle.

3
répondu nick_de_veaux 2017-11-04 20:08:43