Comment utiliser Flask-SQLAlchemy dans une tâche de céleri

je suis récemment passé à Celery 3.0. Avant cela, j'utilisais Flasque-Céleri afin d'intégrer le céleri à la fiole. Bien qu'il ait eu de nombreux problèmes comme cacher quelques fonctionnalités céleri puissant, mais il m'a permis d'utiliser le contexte complet de Flask app et surtout Flask-SQLAlchemy.

dans Mes tâches d'arrière-plan, je traite des données et L'ORM de SQLAlchemy pour stocker les données. Le responsable de Flask-Celery a abandonné le support du plugin. Le plugin a été le décapage de la Flasque instance dans la tâche pour que je puisse avoir un accès complet à SQLAlchemy.

j'essaie de répliquer ce comportement dans mon tasks.py dossier mais sans succès. Avez-vous des conseils sur la façon de réaliser cet objectif?

27
demandé sur Paco 2012-08-21 00:38:53

4 réponses

mise à jour: nous avons depuis commencé à utiliser une meilleure façon de gérer l'application démonté et mis en place sur une base par tâche, sur la base du modèle décrit dans la documentation la plus récente de la fiole.

extensions.py

import flask
from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery

class FlaskCelery(Celery):

    def __init__(self, *args, **kwargs):

        super(FlaskCelery, self).__init__(*args, **kwargs)
        self.patch_task()

        if 'app' in kwargs:
            self.init_app(kwargs['app'])

    def patch_task(self):
        TaskBase = self.Task
        _celery = self

        class ContextTask(TaskBase):
            abstract = True

            def __call__(self, *args, **kwargs):
                if flask.has_app_context():
                    return TaskBase.__call__(self, *args, **kwargs)
                else:
                    with _celery.app.app_context():
                        return TaskBase.__call__(self, *args, **kwargs)

        self.Task = ContextTask

    def init_app(self, app):
        self.app = app
        self.config_from_object(app.config)


celery = FlaskCelery()
db = SQLAlchemy()

app.py

from flask import Flask
from extensions import celery, db

def create_app():
    app = Flask()

    #configure/initialize all your extensions
    db.init_app(app)
    celery.init_app(app)

    return app

une fois que vous avez configuré votre application de cette façon, vous pouvez exécuter et utiliser celery sans avoir à l'exécuter explicitement dans un contexte d'application, car toutes vos tâches seront automatiquement exécutées dans un contexte de l'application si nécessaire, et vous n'avez pas à vous soucier explicitement du démontage post-tâche, qui est une question importante à gérer (voir les autres réponses ci-dessous).

Vieux-réponse ci-dessous, fonctionne toujours, mais pas aussi propre une solution

je préfère courir tous de céleri dans le contexte de l'application en créant un fichier séparé qui invoque le céleri.start () avec le contexte de l'application. Cela signifie que votre fichier de tâches n'a pas à être jonché de configuration de contexte et teardowns. Elle se prête également bien au modèle "application factory".

extensions.py

from from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery

db = SQLAlchemy()
celery = Celery()

tasks.py

from extensions import celery, db
from flask.globals import current_app
from celery.signals import task_postrun

@celery.task
def do_some_stuff():
    current_app.logger.info("I have the application context")
    #you can now use the db object from extensions

@task_postrun.connect
def close_session(*args, **kwargs):
    # Flask SQLAlchemy will automatically create new sessions for you from 
    # a scoped session factory, given that we are maintaining the same app
    # context, this ensures tasks have a fresh session (e.g. session errors 
    # won't propagate across tasks)
    db.session.remove()

app.py

from extensions import celery, db

def create_app():
    app = Flask()

    #configure/initialize all your extensions
    db.init_app(app)
    celery.config_from_object(app.config)

    return app

RunCelery.py

from app import create_app
from extensions import celery

app = create_app()

if __name__ == '__main__':
    with app.app_context():
        celery.start()
54
répondu Paul Gibbs 2018-09-17 02:48:01

Dans votre tasks.py fichier, procédez de la manière suivante:

from main import create_app
app = create_app()

celery = Celery(__name__)
celery.add_defaults(lambda: app.config)

@celery.task
def create_facet(project_id, **kwargs):
    with app.test_request_context():
       # your code
5
répondu PanosJee 2012-08-21 12:42:08

j'ai utilisé réponse de Paul Gibbs avec deux différences. Au lieu de task_postrun, j'ai utilisé worker_process_init. Et au lieu de .remove (), j'ai utilisé db.session.expire_all ().

Je ne suis pas sûr à 100%, mais d'après ce que j'ai compris de la façon dont cela fonctionne c'est quand Celery crée un processus worker, toutes les sessions db héritées/partagées seront expirées, et SQLAlchemy créera de nouvelles sessions à la demande uniques à ce processus worker.

Donc à présent, il semble avoir corrigé mon problème. Avec la solution de Paul, lorsqu'un ouvrier termine et supprime la session, un autre ouvrier utilisant la même session exécute toujours sa requête, donc db.session.remove () a fermé la connexion pendant qu'elle était utilisée, me donnant une exception "connexion perdue au serveur MySQL pendant la requête".

Merci Paul de m'avoir guidé dans la bonne direction!

peu importe que ça n'ait pas marché. J'ai fini par avoir une dispute dans ma flasque. app factory pour ne pas exécuter de db.init_app (app) si Celery l'appelait. Au lieu de cela, les ouvriers l'appelleront après les fourchettes de céleri. Je vois maintenant plusieurs connexions dans ma liste de processus MySQL.

from extensions import db
from celery.signals import worker_process_init
from flask import current_app

@worker_process_init.connect
def celery_worker_init_db(**_):
    db.init_app(current_app)
5
répondu Robpol86 2017-05-23 11:46:22
from flask import Flask
from werkzeug.utils import import_string
from celery.signals import worker_process_init, celeryd_init
from flask_celery import Celery
from src.app import config_from_env, create_app

celery = Celery()

def get_celery_conf():
    config = import_string('src.settings')
    config = {k: getattr(config, k) for k in dir(config) if k.isupper()}
    config['BROKER_URL'] = config['CELERY_BROKER_URL']
    return config

@celeryd_init.connect
def init_celeryd(conf=None, **kwargs):
    conf.update(get_celery_conf())

@worker_process_init.connect
def init_celery_flask_app(**kwargs):
    app = create_app()
    app.app_context().push()
  • mise à Jour de céleri config celeryd init
  • utilisez votre flasque app factory pour initialiser toutes les extensions de flasque, y compris L'extension SQLAlchemy.

En faisant cela, nous sommes en mesure de maintenir la connexion de base de données par travailleur.

si vous voulez exécuter votre tâche dans un contexte flasque, vous pouvez sous-classe Task.__call__:

class SmartTask(Task):

    abstract = True

    def __call__(self, *_args, **_kwargs):
        with self.app.flask_app.app_context():
            with self.app.flask_app.test_request_context():
                result = super(SmartTask, self).__call__(*_args, **_kwargs)
            return result

class SmartCelery(Celery):

    def init_app(self, app):
        super(SmartCelery, self).init_app(app)
        self.Task = SmartTask
2
répondu soasme 2016-06-06 06:42:54