Celery: Full example of task integration in a Django project ¶
Please also note AHAH with Django and jQuery technique usend in "frontend/views.py"
Backend
file "backend/models.py":
from django.db import models class Lavorazione(models.Model): ... PENDING = 0 RUNNING = 1 SUCCESS = 2 FAILURE = 3 STATUS_CHOICES = ( (PENDING, 'PENDING'), (RUNNING, 'RUNNING'), (SUCCESS, 'SUCCESS'), (FAILURE, 'FAILURE'), ) ... status = models.IntegerField('Stato', choices=STATUS_CHOICES, default=PENDING, null=False, db_index=True) task_id = models.UUIDField('task_id', null=True, blank=True, db_index=True, unique=True) def set_status(self, status, commit=True): self.status = status if commit: self.save(update_fields=['status']) def run(self, async): from .tasks import LavorazioneTask if async: # TODO: verificare il comportamento della UI provocando questo errore: #task = LavorazioneTask.apply_async((self.id, )) task = LavorazioneTask.apply_async((str(self.id), )) else: task = LavorazioneTask.apply((str(self.id), )) self.task_id = task.id self.save(update_fields=['task_id']) return task
file "backend/tasks.py":
from celery.decorators import periodic_task, task from celery.exceptions import Ignore from .models import Lavorazione def load_lavorazione(id_lavorazione): """ Celery task might be faster than db transaction """ for i in range(1, 10): try: lavorazione = Lavorazione.objects.get(id=id_lavorazione) return lavorazione #except Lavorazione.DoesNotExists: except: pass time.sleep(1) return None @task def LavorazioneTask(id_lavorazione): print('nuova lavorazione in corso: %s' % id_lavorazione) lavorazione = None try: # lavorazione = Lavorazione.objects.get(id=id_lavorazione) # watch out ! Celery task might be faster than db transaction lavorazione = load_lavorazione(id_lavorazione) lavorazione.set_status(Lavorazione.RUNNING) funzioni_ciclocollaudo = lavorazione.ciclo_collaudo.funzioniciclocollaudo_set nsteps = funzioni_ciclocollaudo.count() for step in range(0, nsteps): objstep = funzioni_ciclocollaudo.all()[step] funzione = objstep.funzione parametri = objstep.parametri info = { 'step': step, 'nsteps': nsteps, 'progress': N, } print(info) print('Funzione: ' + str(funzione)) print('Parametri: ' + parametri) # Simulate job exception #if str(funzione) == 'Errore': # raise Exception('errore') LavorazioneTask.update_state(state='PROGRESS', meta=info) time.sleep(3) #LavorazioneTask.update_state(state='SUCCESS', meta={'progress': 100}) lavorazione.set_status(Lavorazione.SUCCESS) print('nuova lavorazione completata.') except Exception, e: LavorazioneTask.update_state(state='FAILURE', meta={'progress': 100}) lavorazione.set_status(Lavorazione.FAILURE) print('ERROR: %s' % (str(e), )) # ignore the task so no other state is recorded raise Ignore() return 1 def get_lavorazione_task_status(task_id): try: # If you have a task_id, this is how you query that task task = LavorazioneTask.AsyncResult(task_id) #progress = 0 # status = task.status # if status == u'SUCCESS': # progress = 100 # elif status == u'FAILURE': # progress = 0 # elif status == 'PROGRESS': # progress = task.info['progress'] return {'status': task.status, 'info': task.info} # , 'progress': progress} except: return {'status': 'FAILURE', 'info': {}}
file "backend/urls.py":
from django.conf.urls import url, patterns from .views import start_lavorazione from .views import check_task_status urlpatterns = patterns('', url(r'^start-lavorazione/$', start_lavorazione, name="start_lavorazione"), url(r'^j/check_task_status/$', check_task_status, name="j_check_task_status"), ... )
file "backend/views.py":
from django.views.decorators.csrf import csrf_exempt from django.http import JsonResponse from django.core.exceptions import PermissionDenied @csrf_exempt def start_lavorazione(request): print request.POST print request.is_ajax() print '' if not request.is_ajax(): raise PermissionDenied id_ciclo_collaudo = request.POST.get('id_ciclo_collaudo', '') id_dispositivo = request.POST.get('id_dispositivo', '') try: lavorazione = Lavorazione.objects.create( dispositivo=Dispositivo.objects.get(id=id_dispositivo), ciclo_collaudo=CicloCollaudo.objects.get(id=id_ciclo_collaudo), ) task = lavorazione.run(async=True) json_response = { 'task_id': task.id, } response_status = 200 except Exception, e: json_response = str(e) response_status = 404 return JsonResponse(json_response, status=response_status) # Possibly unused and/or redundant with tasks/get_importazione_task_status() def check_task_status(request): if not request.is_ajax(): raise PermissionDenied try: task_id = request.REQUEST.get('task_id') async_result = AsyncResult(task_id) json_response = { 'ready': async_result.ready(), 'status': async_result.status, 'info': async_result.info, } response_status = 200 except Exception, e: json_response = str(e) response_status = 404 return JsonResponse(json_response, status=response_status)
file "backend/management/command/do_esegui_lavorazione.py":
# -*- coding: UTF-8 -*- from django.core.management.base import BaseCommand #from django.core.management.base import CommandError from backend.models import CicloCollaudo from backend.models import Dispositivo from backend.tasks import Lavorazione class Command(BaseCommand): help = "Utilizzo: do_esegui_lavorazione <id_ciclo_collaudo> <id_dispositivo> ..." def add_arguments(self, parser): # Positional arguments parser.add_argument('ids', nargs='*', type=str) # Named (optional) arguments def list_models(self, model): for obj in model.objects.all(): print ' %s (%s)' % (obj.id, obj.descrizione) def handle(self, *args, **options): ids = options['ids'] if len(ids) != 2: print 'Specificare <id_ciclo_collaudo> e <id_dispositivo>' print '' print 'Cicli di collaudo disponibili:' self.list_models(CicloCollaudo) print 'Dispositivi:' self.list_models(Dispositivo) else: lavorazione = Lavorazione.objects.create( ciclo_collaudo=CicloCollaudo.objects.get(id=ids[0]), dispositivo=Dispositivo.objects.get(id=ids[1]), ) task = lavorazione.run(async=False) print 'task: %s' % str(task.id)
Frontend
file "frontend/urls.py":
from django.conf.urls import url, patterns from .views import active_task urlpatterns = patterns('', url(r'^active_task/(?P<task_id>[^/]+)/$', active_task, name="active_task"), ... )
file "frontend/views.py"
from django.contrib.auth.decorators import login_required from backend.tasks import get_lavorazione_task_status from django.http import JsonResponse from backend.models import Lavorazione # Adapted from: # "Use html fragments and a decorator to add Ajax functionality to existing views" # https://dzone.com/articles/ahah-django-and-jquery @login_required def active_task(request, task_id): task_data = get_lavorazione_task_status(task_id) context = { 'task_id': task_id, 'task_data': task_data, } # Se la view e' stata invocata via Ajax, facciamo il rendering dei # soli frammenti HTML che richiedono aggiornamento, e li inviamo al client, # che provvedera' poi ad aggiornare il DOM della pagina gia' visualizzata if request.is_ajax(): html_content = render_to_string( "active_task_ajax_content.html", context ) return JsonResponse({ 'html_content': html_content, 'task_data': task_data, }) lavorazione = Lavorazione.objects.get(task_id=task_id) context['lavorazione'] = lavorazione # Altrimenti costruiamo l'intera pagina return render( request, 'active_task.html', context )
file "frontend/templates/active_task_ajax_content.html":
<div> {{task_data}} </div>
file "frontend/templates/active_task.html":
{% extends "base.html" %} ... {% block extrajs %} ... <script type="text/javascript" src="{% static 'js/active_task.js' %}"></script> <script language="javascript"> // Call UpdatePage, then start another shot function Tick() { UpdatePage(StartOneShotTimer); } // Start one shot timer function StartOneShotTimer() { var ms = {{config.TASK_POLLING_INTERVAL}}; if (ms > 0) { window.setTimeout(Tick, ms); } } $(document).ready(function() { StartOneShotTimer(); }); $("#manually_update_page").click(function() { UpdatePage(); }); </script> {% endblock extrajs %} {% block sidebar %} ... <a id="manually_update_page" href="#">Aggiorna</a> {% endblock sidebar %} {% block content %} <h2>Lavorazione in corso</h2> <table class="datatable"> <tr><td>Task</td><td>{{task_id}}</td></tr> <tr><td>Lavorazione</td><td>{{lavorazione}}</td></tr> <tr><td>Dispositivo</td><td>{{lavorazione.dispositivo}}</td></tr> <tr><td>Ciclo di Collaudo</td><td>{{lavorazione.ciclo_collaudo}}</td></tr> </table> <div id="html_content"> {% include "active_task_ajax_content.html" %} </div> {% endblock content %}
file "frontend/static/js/active_task.js":
// Update dynamic fragments of this page function UpdatePage(callback=null) { $.ajax({ type: "GET", url: window.location.pathname, data: {}, cache: false, crossDomain: false, dataType: 'json' }).done(function(data) { $("#html_sidebar").html(data.html_sidebar); console.log('task_data: %o', data.task_data); var status = data.task_data.status; console.log('status: %o', status); if (status == "SUCCESS") { $('.funzioni .step').addClass('completed'); } else if (status == "FAILURE") { $('.funzioni .step').addClass('error'); } else if (status == "PROGRESS") { for (i = 0; i <= data.task_data.info.step; i++) { var selector = sprintf('.funzioni .step_%d', i); $(selector).addClass('completed'); } } $("#html_content").html(data.html_content); }).always(function() { if (callback != null) { callback(); } }); }