Wie benutzt man Flask-SQLAlchemy in einer Celery-Aufgabe

Ich habe vor kurzem auf Celery 3.0 umgestellt. Davor habe ich Flaschen-Sellerie benutzt, um Sellerie mit Flasche zu integrieren. Obwohl es viele Probleme hatte, einige leistungsstarke Celery-Funktionalitäten zu verbergen, aber es erlaubte mir, den vollen Kontext von Flask App und besonders Flask-SQLAlchemy zu benutzen.

In meinem Hintergrund Aufgaben verarbeiten wir Daten und die SQLAlchemy ORM, um die Daten zu speichern. Der Betreuer von Flask-Sellerie hat die Unterstützung des Plugins gesenkt. Das Plugin war das Beizen der Flask-Instanz in der Aufgabe, so konnte ich vollen Zugriff auf SQLAlchemy haben.

Ich versuche, dieses Verhalten in meinem task.py Datei aber ohne Erfolg zu replizieren. Haben Sie Hinweise darauf, wie Sie das erreichen können?

4 Solutions collect form web for “Wie benutzt man Flask-SQLAlchemy in einer Celery-Aufgabe”

Update: Wir haben seitdem begonnen, eine bessere Art und Weise zu verwenden, um die Anwendung zu behandeln und auf einer pro-Task-Basis zu basieren, basierend auf dem Muster, das in der neueren Flaschendokumentation beschrieben wurde .

Extensions.py

import flask from flask.ext.sqlalchemy import SQLAlchemy from celery import Celery class FlaskCelery(Celery): def __init__(self, *args, **kwargs): super(FlaskCelery, self).__init__(*args, **kwargs) self.patch_task() if 'app' in kwargs: self.init_app(kwargs['app']) def patch_task(self): TaskBase = self.Task _celery = self class ContextTask(TaskBase): abstract = True def __call__(self, *args, **kwargs): if flask.has_app_context(): return TaskBase.__call__(self, *args, **kwargs) else: with _celery.app.app_context(): return TaskBase.__call__(self, *args, **kwargs) self.Task = ContextTask def init_app(self, app): self.app = app self.config_from_object(app.config) celery = FlaskCelery() db = SQLAlchemy() 

App.py

 from flask import Flask from extensions import celery, db def create_app(): app = Flask() #configure/initialize all your extensions db.init_app(app) celery.init_app(app) return app 

Sobald Sie Ihre App auf diese Weise eingerichtet haben, können Sie ausführen und verwenden Sie Sellerie, ohne es explizit aus einem Anwendungskontext ausführen zu müssen, da alle Ihre Aufgaben automatisch in einem Anwendungskontext ausgeführt werden, wenn nötig, und Sie haben nicht Um sich ausdrücklich um Post-Task-Teardown zu kümmern, was ein wichtiges Thema ist (siehe andere Antworten unten).

Alte Antwort unten, funktioniert immer noch, aber nicht so sauber eine Lösung

Ich ziehe es vor, alle Sellerie im Anwendungskontext zu starten, indem du eine separate Datei schreibst, die celery.start () mit dem Kontext der Anwendung aufruft. Dies bedeutet, dass Ihre Aufgaben-Datei nicht mit Kontext-Setup und Teardowns übersät werden muss. Es eignet sich auch gut für das "Bewerbungsfabrik" -Muster.

Extensions.py

 from from flask.ext.sqlalchemy import SQLAlchemy from celery import Celery db = SQLAlchemy() celery = Celery() 

Aufgaben.py

 from extensions import celery, db from flask.globals import current_app from celery.signals import task_postrun @celery.task def do_some_stuff(): current_app.logger.info("I have the application context") #you can now use the db object from extensions @task_postrun.connect def close_session(*args, **kwargs): # Flask SQLAlchemy will automatically create new sessions for you from # a scoped session factory, given that we are maintaining the same app # context, this ensures tasks have a fresh session (eg session errors # won't propagate across tasks) db.session.remove() 

App.py

 from extensions import celery, db def create_app(): app = Flask() #configure/initialize all your extensions db.init_app(app) celery.config_from_object(app.config) return app 

RunCelery.py

 from app import create_app from extensions import celery app = create_app() if __name__ == '__main__': with app.app_context(): celery.start() 

In deiner task.py-Datei folge folgendes:

 from main import create_app app = create_app() celery = Celery(__name__) celery.add_defaults(lambda: app.config) @celery.task def create_facet(project_id, **kwargs): with app.test_request_context(): # your code 

Ich habe die Antwort von Paul Gibbs mit zwei Unterschieden benutzt. Statt task_postrun habe ich worker_process_init benutzt. Und anstelle von .remove () habe ich db.session.expire_all () benutzt.

Ich bin nicht 100% sicher, aber von dem, was ich verstehe, wie dies funktioniert ist, wenn Celery einen Worker-Prozess schafft, werden alle geerbten / geteilten db-Sessions abgelaufen sein, und SQLAlchemy wird neue Sessions on Demand erstellen, die für diesen Worker-Prozess einzigartig sind.

Bisher scheint es mein Problem behoben zu haben. Mit der Lösung von Paul, als ein Arbeiter die Sitzung beendet und entfernt hat, hat ein anderer Arbeiter, der dieselbe Sitzung benutzt, immer noch seine Abfrage ausgeführt, so dass db.session.remove () die Verbindung geschlossen hat, während es benutzt wurde und gab mir eine "verlorene Verbindung zu MySQL Server während der Abfrage "Ausnahme.

Danke Paul für die Lenkung in die richtige Richtung!

Nevermind das hat nicht geklappt Ich habe am Ende ein Argument in meiner Flask App-Fabrik nicht laufen db.init_app (App), wenn Celery war es nennen. Stattdessen werden die Arbeiter es nennen, nachdem Celery sie gezwungen hat. Ich sehe jetzt mehrere Verbindungen in meiner MySQL-Prozessliste.

 from extensions import db from celery.signals import worker_process_init from flask import current_app @worker_process_init.connect def celery_worker_init_db(**_): db.init_app(current_app) 
 from flask import Flask from werkzeug.utils import import_string from celery.signals import worker_process_init, celeryd_init from flask_celery import Celery from src.app import config_from_env, create_app celery = Celery() def get_celery_conf(): config = import_string('src.settings') config = {k: getattr(config, k) for k in dir(config) if k.isupper()} config['BROKER_URL'] = config['CELERY_BROKER_URL'] return config @celeryd_init.connect def init_celeryd(conf=None, **kwargs): conf.update(get_celery_conf()) @worker_process_init.connect def init_celery_flask_app(**kwargs): app = create_app() app.app_context().push() 
  • Update celery config bei celeryd init
  • Verwenden Sie Ihre Flasche App-Fabrik, um alle Kolben-Erweiterungen, einschließlich SQLAlchemy Erweiterung zu initialisieren.

Auf diese Weise können wir die Datenbankverbindung per Arbeiter pflegen.

Wenn du deine Aufgabe unter dem Flaschen-Kontext Task.__call__ , kannst du die Task.__call__ :

 class SmartTask(Task): abstract = True def __call__(self, *_args, **_kwargs): with self.app.flask_app.app_context(): with self.app.flask_app.test_request_context(): result = super(SmartTask, self).__call__(*_args, **_kwargs) return result class SmartCelery(Celery): def init_app(self, app): super(SmartCelery, self).init_app(app) self.Task = SmartTask 
  • Sellerie Warum bleibt die Aufgabe in der Warteschlange
  • Python celery multi unregistrierte Aufgabe
  • Führen Sie einen Sellerie-Arbeiter aus, der mit dem Django-Test-DB verbunden ist
  • Django Celery Periodische Aufgaben laufen aber RabbitMQ Warteschlangen werden nicht verbraucht
  • Sellerie gibt Verbindung zurücksetzen durch Peer
  • Sellerie und Django - Kein Modul namens 'django'
  • "Unbekannte Aufgabe" Fehler bei Celery Flower bei der Buchung einer neuen Aufgabe
  • Gleiche Aufgabe mehrmals ausgeführt
  • Wie zu sagen, wenn django Sellerie Aufgabe ordnungsgemäß läuft scrapy Spinne
  • ImproperlyConfigured ("settings.DATABASES ist falsch konfiguriert.") Fehler beim Versuch, Django einzurichten
  • Django Celery Logging Best Practice
  • Python ist die beste Programmiersprache der Welt.