Privacy Policy
Snippets index

  Celery: Full example of task integration in a Django project

Please also note AHAH with Django and jQuery technique usend in "frontend/views.py"

Backend

file "backend/models.py":

from django.db import models

class Lavorazione(models.Model):

    ...

    PENDING = 0
    RUNNING = 1
    SUCCESS = 2
    FAILURE = 3

    STATUS_CHOICES = (
        (PENDING, 'PENDING'),
        (RUNNING, 'RUNNING'),
        (SUCCESS, 'SUCCESS'),
        (FAILURE, 'FAILURE'),
    )

    ...
    status = models.IntegerField('Stato', choices=STATUS_CHOICES, default=PENDING, null=False, db_index=True)
    task_id = models.UUIDField('task_id', null=True, blank=True, db_index=True, unique=True)

    def set_status(self, status, commit=True):
        self.status = status
        if commit:
            self.save(update_fields=['status'])

    def run(self, async):
        from .tasks import LavorazioneTask
        if async:
            # TODO: verificare il comportamento della UI provocando questo errore:
            #task = LavorazioneTask.apply_async((self.id, ))
            task = LavorazioneTask.apply_async((str(self.id), ))
        else:
            task = LavorazioneTask.apply((str(self.id), ))
        self.task_id = task.id
        self.save(update_fields=['task_id'])
        return task

file "backend/tasks.py":

from celery.decorators import periodic_task, task
from celery.exceptions import Ignore
from .models import Lavorazione


def load_lavorazione(id_lavorazione):
    """
    Celery task might be faster than db transaction
    """
    for i in range(1, 10):
        try:
            lavorazione = Lavorazione.objects.get(id=id_lavorazione)
            return lavorazione
        #except Lavorazione.DoesNotExists:
        except:
            pass
        time.sleep(1)
    return None


@task
def LavorazioneTask(id_lavorazione):
    print('nuova lavorazione in corso: %s' % id_lavorazione)

    lavorazione = None

    try:
        # lavorazione = Lavorazione.objects.get(id=id_lavorazione)
        # watch out ! Celery task might be faster than db transaction
        lavorazione = load_lavorazione(id_lavorazione)
        lavorazione.set_status(Lavorazione.RUNNING)

        funzioni_ciclocollaudo = lavorazione.ciclo_collaudo.funzioniciclocollaudo_set
        nsteps = funzioni_ciclocollaudo.count()

        for step in range(0, nsteps):

            objstep = funzioni_ciclocollaudo.all()[step]
            funzione = objstep.funzione
            parametri = objstep.parametri

            info = {
                'step': step,
                'nsteps': nsteps,
                'progress': N,
            }

            print(info)
            print('Funzione: ' + str(funzione))
            print('Parametri: ' + parametri)

            # Simulate job exception
            #if str(funzione) == 'Errore':
            #    raise Exception('errore')

            LavorazioneTask.update_state(state='PROGRESS', meta=info)
            time.sleep(3)

        #LavorazioneTask.update_state(state='SUCCESS', meta={'progress': 100})
        lavorazione.set_status(Lavorazione.SUCCESS)
        print('nuova lavorazione completata.')

    except Exception, e:
        LavorazioneTask.update_state(state='FAILURE', meta={'progress': 100})
        lavorazione.set_status(Lavorazione.FAILURE)
        print('ERROR: %s' % (str(e), ))

        # ignore the task so no other state is recorded
        raise Ignore()

    return 1


def get_lavorazione_task_status(task_id):

    try:

        # If you have a task_id, this is how you query that task
        task = LavorazioneTask.AsyncResult(task_id)

        #progress = 0
        # status = task.status
        # if status == u'SUCCESS':
        #     progress = 100
        # elif status == u'FAILURE':
        #     progress = 0
        # elif status == 'PROGRESS':
        #     progress = task.info['progress']

        return {'status': task.status, 'info': task.info}  # , 'progress': progress}

    except:

        return {'status': 'FAILURE', 'info': {}}

file "backend/urls.py":

from django.conf.urls import url, patterns
from .views import start_lavorazione
from .views import check_task_status


urlpatterns = patterns('',
    url(r'^start-lavorazione/$', start_lavorazione, name="start_lavorazione"),
    url(r'^j/check_task_status/$', check_task_status, name="j_check_task_status"),
    ...
)

file "backend/views.py":

from django.views.decorators.csrf import csrf_exempt
from django.http import JsonResponse
from django.core.exceptions import PermissionDenied

@csrf_exempt
def start_lavorazione(request):

    print request.POST
    print request.is_ajax()
    print ''

    if not request.is_ajax():
        raise PermissionDenied

    id_ciclo_collaudo = request.POST.get('id_ciclo_collaudo', '')
    id_dispositivo = request.POST.get('id_dispositivo', '')

    try:

        lavorazione = Lavorazione.objects.create(
            dispositivo=Dispositivo.objects.get(id=id_dispositivo),
            ciclo_collaudo=CicloCollaudo.objects.get(id=id_ciclo_collaudo),
        )

        task = lavorazione.run(async=True)


        json_response = { 'task_id': task.id, }
        response_status = 200
    except Exception, e:
        json_response = str(e)
        response_status = 404

    return JsonResponse(json_response, status=response_status)

# Possibly unused and/or redundant with tasks/get_importazione_task_status()
def check_task_status(request):
    if not request.is_ajax():
        raise PermissionDenied
    try:
        task_id = request.REQUEST.get('task_id')
        async_result = AsyncResult(task_id)
        json_response = {
            'ready': async_result.ready(),
            'status': async_result.status,
            'info': async_result.info,
        }
        response_status = 200
    except Exception, e:
        json_response = str(e)
        response_status = 404
    return JsonResponse(json_response, status=response_status)

file "backend/management/command/do_esegui_lavorazione.py":

# -*- coding: UTF-8 -*-
from django.core.management.base import BaseCommand
#from django.core.management.base import CommandError
from backend.models import CicloCollaudo
from backend.models import Dispositivo
from backend.tasks import Lavorazione


class Command(BaseCommand):

    help = "Utilizzo: do_esegui_lavorazione <id_ciclo_collaudo> <id_dispositivo> ..."

    def add_arguments(self, parser):

        # Positional arguments
        parser.add_argument('ids', nargs='*', type=str)

        # Named (optional) arguments

    def list_models(self, model):
        for obj in model.objects.all():
            print '    %s (%s)' % (obj.id, obj.descrizione)

    def handle(self, *args, **options):

        ids = options['ids']
        if len(ids) != 2:
            print 'Specificare <id_ciclo_collaudo> e <id_dispositivo>'
            print ''
            print 'Cicli di collaudo disponibili:'
            self.list_models(CicloCollaudo)
            print 'Dispositivi:'
            self.list_models(Dispositivo)
        else:
            lavorazione = Lavorazione.objects.create(
                ciclo_collaudo=CicloCollaudo.objects.get(id=ids[0]),
                dispositivo=Dispositivo.objects.get(id=ids[1]),
            )
            task = lavorazione.run(async=False)
            print 'task: %s' % str(task.id)

Frontend

file "frontend/urls.py":

from django.conf.urls import url, patterns
from .views import active_task


urlpatterns = patterns('',
    url(r'^active_task/(?P<task_id>[^/]+)/$', active_task, name="active_task"),
    ...
)

file "frontend/views.py"

from django.contrib.auth.decorators import login_required
from backend.tasks import get_lavorazione_task_status
from django.http import JsonResponse
from backend.models import Lavorazione


# Adapted from:
# "Use html fragments and a decorator to add Ajax functionality to existing views"
# https://dzone.com/articles/ahah-django-and-jquery

@login_required
def active_task(request, task_id):

    task_data = get_lavorazione_task_status(task_id)
    context = {
        'task_id': task_id,
        'task_data': task_data,
    }

    # Se la view e' stata invocata via Ajax, facciamo il rendering dei
    # soli frammenti HTML che richiedono aggiornamento, e li inviamo al client,
    # che provvedera' poi ad aggiornare il DOM della pagina gia' visualizzata
    if request.is_ajax():

        html_content = render_to_string(
            "active_task_ajax_content.html",
            context
        )

        return JsonResponse({
            'html_content': html_content,
            'task_data': task_data,
        })

    lavorazione = Lavorazione.objects.get(task_id=task_id)
    context['lavorazione'] = lavorazione

    # Altrimenti costruiamo l'intera pagina
    return render(
        request,
        'active_task.html',
        context
    )

file "frontend/templates/active_task_ajax_content.html":

<div>
{{task_data}}
</div>

file "frontend/templates/active_task.html":

{% extends "base.html" %}

...

{% block extrajs %}
    ...
    <script type="text/javascript" src="{% static 'js/active_task.js' %}"></script>
    <script language="javascript">

        // Call UpdatePage, then start another shot
        function Tick() {
            UpdatePage(StartOneShotTimer);
        }

        // Start one shot timer
        function StartOneShotTimer() {
            var ms = {{config.TASK_POLLING_INTERVAL}};
            if (ms > 0) {
                window.setTimeout(Tick, ms);
            }
        }

        $(document).ready(function() {
            StartOneShotTimer();
        });

        $("#manually_update_page").click(function() {
            UpdatePage();
        });

    </script>

{% endblock extrajs %}


{% block sidebar %}
    ...
    <a id="manually_update_page" href="#">Aggiorna</a>
{% endblock sidebar %}


{% block content %}

    <h2>Lavorazione in corso</h2>

    <table class="datatable">
        <tr><td>Task</td><td>{{task_id}}</td></tr>
        <tr><td>Lavorazione</td><td>{{lavorazione}}</td></tr>
        <tr><td>Dispositivo</td><td>{{lavorazione.dispositivo}}</td></tr>
        <tr><td>Ciclo di Collaudo</td><td>{{lavorazione.ciclo_collaudo}}</td></tr>
    </table>

    <div id="html_content">
        {% include "active_task_ajax_content.html" %}
    </div>

{% endblock content %}

file "frontend/static/js/active_task.js":

// Update dynamic fragments of this page
function UpdatePage(callback=null) {
    $.ajax({
        type: "GET",
        url: window.location.pathname,
        data: {},
        cache: false,
        crossDomain: false,
        dataType: 'json'
    }).done(function(data) {
        $("#html_sidebar").html(data.html_sidebar);

        console.log('task_data: %o', data.task_data);

        var status = data.task_data.status;
        console.log('status: %o', status);

        if (status == "SUCCESS") {
            $('.funzioni .step').addClass('completed');
        }
        else if (status == "FAILURE") {
            $('.funzioni .step').addClass('error');
        }
        else if (status == "PROGRESS") {
            for (i = 0; i <= data.task_data.info.step; i++) {
                var selector = sprintf('.funzioni .step_%d', i);
                $(selector).addClass('completed');
            }
        }

        $("#html_content").html(data.html_content);
    }).always(function() {
        if (callback != null) {
            callback();
        }
    });
}