Comment utiliser Flask-SQLAlchemy dans une tâche de céleri
je suis récemment passé à Celery 3.0. Avant cela, j'utilisais Flasque-Céleri afin d'intégrer le céleri à la fiole. Bien qu'il ait eu de nombreux problèmes comme cacher quelques fonctionnalités céleri puissant, mais il m'a permis d'utiliser le contexte complet de Flask app et surtout Flask-SQLAlchemy.
dans Mes tâches d'arrière-plan, je traite des données et L'ORM de SQLAlchemy pour stocker les données. Le responsable de Flask-Celery a abandonné le support du plugin. Le plugin a été le décapage de la Flasque instance dans la tâche pour que je puisse avoir un accès complet à SQLAlchemy.
j'essaie de répliquer ce comportement dans mon tasks.py dossier mais sans succès. Avez-vous des conseils sur la façon de réaliser cet objectif?
4 réponses
mise à jour: nous avons depuis commencé à utiliser une meilleure façon de gérer l'application démonté et mis en place sur une base par tâche, sur la base du modèle décrit dans la documentation la plus récente de la fiole.
extensions.py
import flask
from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery
class FlaskCelery(Celery):
def __init__(self, *args, **kwargs):
super(FlaskCelery, self).__init__(*args, **kwargs)
self.patch_task()
if 'app' in kwargs:
self.init_app(kwargs['app'])
def patch_task(self):
TaskBase = self.Task
_celery = self
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
if flask.has_app_context():
return TaskBase.__call__(self, *args, **kwargs)
else:
with _celery.app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
self.Task = ContextTask
def init_app(self, app):
self.app = app
self.config_from_object(app.config)
celery = FlaskCelery()
db = SQLAlchemy()
app.py
from flask import Flask
from extensions import celery, db
def create_app():
app = Flask()
#configure/initialize all your extensions
db.init_app(app)
celery.init_app(app)
return app
une fois que vous avez configuré votre application de cette façon, vous pouvez exécuter et utiliser celery sans avoir à l'exécuter explicitement dans un contexte d'application, car toutes vos tâches seront automatiquement exécutées dans un contexte de l'application si nécessaire, et vous n'avez pas à vous soucier explicitement du démontage post-tâche, qui est une question importante à gérer (voir les autres réponses ci-dessous).
Vieux-réponse ci-dessous, fonctionne toujours, mais pas aussi propre une solution
je préfère courir tous de céleri dans le contexte de l'application en créant un fichier séparé qui invoque le céleri.start () avec le contexte de l'application. Cela signifie que votre fichier de tâches n'a pas à être jonché de configuration de contexte et teardowns. Elle se prête également bien au modèle "application factory".
extensions.py
from from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery
db = SQLAlchemy()
celery = Celery()
tasks.py
from extensions import celery, db
from flask.globals import current_app
from celery.signals import task_postrun
@celery.task
def do_some_stuff():
current_app.logger.info("I have the application context")
#you can now use the db object from extensions
@task_postrun.connect
def close_session(*args, **kwargs):
# Flask SQLAlchemy will automatically create new sessions for you from
# a scoped session factory, given that we are maintaining the same app
# context, this ensures tasks have a fresh session (e.g. session errors
# won't propagate across tasks)
db.session.remove()
app.py
from extensions import celery, db
def create_app():
app = Flask()
#configure/initialize all your extensions
db.init_app(app)
celery.config_from_object(app.config)
return app
RunCelery.py
from app import create_app
from extensions import celery
app = create_app()
if __name__ == '__main__':
with app.app_context():
celery.start()
Dans votre tasks.py fichier, procédez de la manière suivante:
from main import create_app
app = create_app()
celery = Celery(__name__)
celery.add_defaults(lambda: app.config)
@celery.task
def create_facet(project_id, **kwargs):
with app.test_request_context():
# your code
j'ai utilisé réponse de Paul Gibbs avec deux différences. Au lieu de task_postrun, j'ai utilisé worker_process_init. Et au lieu de .remove (), j'ai utilisé db.session.expire_all ().
Je ne suis pas sûr à 100%, mais d'après ce que j'ai compris de la façon dont cela fonctionne c'est quand Celery crée un processus worker, toutes les sessions db héritées/partagées seront expirées, et SQLAlchemy créera de nouvelles sessions à la demande uniques à ce processus worker.
Donc à présent, il semble avoir corrigé mon problème. Avec la solution de Paul, lorsqu'un ouvrier termine et supprime la session, un autre ouvrier utilisant la même session exécute toujours sa requête, donc db.session.remove () a fermé la connexion pendant qu'elle était utilisée, me donnant une exception "connexion perdue au serveur MySQL pendant la requête".
Merci Paul de m'avoir guidé dans la bonne direction!
peu importe que ça n'ait pas marché. J'ai fini par avoir une dispute dans ma flasque. app factory pour ne pas exécuter de db.init_app (app) si Celery l'appelait. Au lieu de cela, les ouvriers l'appelleront après les fourchettes de céleri. Je vois maintenant plusieurs connexions dans ma liste de processus MySQL.
from extensions import db
from celery.signals import worker_process_init
from flask import current_app
@worker_process_init.connect
def celery_worker_init_db(**_):
db.init_app(current_app)
from flask import Flask
from werkzeug.utils import import_string
from celery.signals import worker_process_init, celeryd_init
from flask_celery import Celery
from src.app import config_from_env, create_app
celery = Celery()
def get_celery_conf():
config = import_string('src.settings')
config = {k: getattr(config, k) for k in dir(config) if k.isupper()}
config['BROKER_URL'] = config['CELERY_BROKER_URL']
return config
@celeryd_init.connect
def init_celeryd(conf=None, **kwargs):
conf.update(get_celery_conf())
@worker_process_init.connect
def init_celery_flask_app(**kwargs):
app = create_app()
app.app_context().push()
- mise à Jour de céleri config celeryd init
- utilisez votre flasque app factory pour initialiser toutes les extensions de flasque, y compris L'extension SQLAlchemy.
En faisant cela, nous sommes en mesure de maintenir la connexion de base de données par travailleur.
si vous voulez exécuter votre tâche dans un contexte flasque, vous pouvez sous-classe Task.__call__
:
class SmartTask(Task):
abstract = True
def __call__(self, *_args, **_kwargs):
with self.app.flask_app.app_context():
with self.app.flask_app.test_request_context():
result = super(SmartTask, self).__call__(*_args, **_kwargs)
return result
class SmartCelery(Celery):
def init_app(self, app):
super(SmartCelery, self).init_app(app)
self.Task = SmartTask