Commit a3c5eef7 authored by Fabien  MAREUIL's avatar Fabien MAREUIL
Browse files

Class Task, monitoring celery task

parent 9b8fa771
Pipeline #26478 failed with stage
in 6 minutes and 3 seconds
......@@ -27,8 +27,10 @@ from .models import (
Ppi,
ProteinDomainComplex,
Contribution,
Job,
)
from .tasks import launch_validate_contributions
from django.urls import reverse
class ViewOnSiteModelAdmin(admin.ModelAdmin):
......@@ -60,6 +62,68 @@ class ViewOnSiteModelAdmin(admin.ModelAdmin):
)
@admin.register(Job)
class JobModelAdmin(admin.ModelAdmin):
date_hierarchy = "task_result__date_done"
list_display = (
"task_result_task_name",
"task_result_task_id",
"task_result_status",
"task_result_date_created",
"task_result_date_done",
)
list_filter = (
"task_result__status",
"task_result__date_done",
"task_result__task_name",
)
readonly_fields = (
"task_result_task_name",
"task_result_task_id",
"task_result_status",
"task_result_date_created",
"task_result_date_done",
)
search_fields = (
"task_result__task_name",
"task_result__task_id",
"task_result__status",
)
fields = (
("task_result_task_name", "task_result_task_id"),
"task_result_status",
("task_result_date_created", "task_result_date_done"),
("std_out", "std_err"),
)
def task_result_task_id(self, x):
return x.task_result.task_id
def task_result_task_name(self, x):
return x.task_result.task_name
def task_result_date_done(self, x):
return x.task_result.date_done
def task_result_status(self, x):
return x.task_result.status
def task_result_date_created(self, x):
return x.task_result.date_created
# def task_link(self, x):
# href = reverse("admin:task-results", args=[x.task_result.id])
# print(href)
# return '<a href="/admin/django_celery_results/taskresult/{}/change/">{}</a>'.format(x.task_result.id, x.task_result.task_id)
# task_link.allow_tags = True
task_result_task_id.short_description = "task_id"
task_result_task_name.short_description = "task_name"
task_result_date_done.short_description = "date_done"
task_result_status.short_description = "status"
task_result_date_created.short_description = "date_created"
@admin.register(Bibliography)
class BibliographyModelAdmin(ViewOnSiteModelAdmin):
list_display = ("authors_list", "title", "journal_name", "biblio_year", "id_source")
......
# Generated by Django 2.2.1 on 2020-03-18 23:11
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('ippidb', '0040_compound_families'),
]
operations = [
migrations.CreateModel(
name='Job',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.CharField(help_text='task name', max_length=125)),
('status', models.CharField(choices=[('INIT', 'init'), ('RUNNING', 'running'), ('SUCCESS', 'success'), ('ERROR', 'error')], db_index=True, default='INIT', help_text='The status of this job.', max_length=30, verbose_name='job status')),
('std_out', models.TextField(blank=True, help_text='task standard output', null=True)),
('std_err', models.TextField(blank=True, help_text='task error output', null=True)),
],
),
]
# Generated by Django 2.2.1 on 2020-03-19 10:32
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('ippidb', '0041_job'),
]
operations = [
migrations.AlterField(
model_name='job',
name='std_err',
field=models.TextField(blank=True, default='', help_text='task error output', null=True),
),
migrations.AlterField(
model_name='job',
name='std_out',
field=models.TextField(blank=True, default='', help_text='task standard output', null=True),
),
]
# Generated by Django 2.2.1 on 2020-03-20 17:37
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('ippidb', '0042_auto_20200319_1032'),
]
operations = [
migrations.AddField(
model_name='job',
name='task_id',
field=models.CharField(blank=True, help_text='task id', max_length=50, null=True),
),
]
# Generated by Django 2.2.1 on 2020-03-23 11:10
from django.db import migrations, models
import django.db.models.deletion
import django.utils.timezone
class Migration(migrations.Migration):
dependencies = [
('django_celery_results', '0007_remove_taskresult_hidden'),
('ippidb', '0043_job_task_id'),
]
operations = [
migrations.RemoveField(
model_name='job',
name='name',
),
migrations.RemoveField(
model_name='job',
name='status',
),
migrations.RemoveField(
model_name='job',
name='task_id',
),
migrations.AddField(
model_name='job',
name='task_result',
field=models.OneToOneField(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='django_celery_results.TaskResult'),
preserve_default=False,
),
]
# Generated by Django 2.2.1 on 2020-03-23 12:35
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('ippidb', '0044_auto_20200323_1110'),
]
operations = [
migrations.AlterField(
model_name='job',
name='task_result',
field=models.OneToOneField(default=None, on_delete=django.db.models.deletion.CASCADE, to='django_celery_results.TaskResult'),
),
]
......@@ -6,6 +6,7 @@ from __future__ import unicode_literals
import operator
import re
import sys
from django.conf import settings
from django.contrib.auth import get_user_model
......@@ -16,6 +17,9 @@ from django.db.models import Max, Count, F, Q, Case, When, Subquery, OuterRef
from django.db.models.functions import Cast
from django.urls import reverse
from django.utils.translation import ugettext_lazy as _
from django_celery_results.models import TaskResult
from django.dispatch import receiver
from django.db.models.signals import post_save
from .utils import FingerPrinter, smi2inchi, smi2inchikey
from .ws import (
......@@ -1691,3 +1695,33 @@ def update_compound_cached_properties(compounds_queryset=None):
.values("_lipinsky")[:1]
)
)
class Job(models.Model):
task_result = models.OneToOneField(
TaskResult, on_delete=models.CASCADE, default=None
)
std_out = models.TextField(
null=True, default="", blank=True, help_text="task standard output"
)
std_err = models.TextField(
null=True, default="", blank=True, help_text="task error output"
)
def write(self, message, output="std_out"):
if output == "std_out":
print(message, file=sys.stdout)
self.std_out += "\n{}".format(message)
self.save()
elif output == "std_err":
print(message, file=sys.stderr)
self.std_err += "\n{}".format(message)
self.save()
else:
raise Exception("output doesn't exist")
@receiver(post_save, sender=TaskResult)
def post_save_taskresult(sender, instance, created, *args, **kwargs):
if created:
Job.objects.create(task_result=instance)
......@@ -6,9 +6,12 @@ import base64
import itertools
from celery import shared_task
import seaborn as sns
from celery import shared_task, task, states
from ippisite.decorator import IppidbTask
import matplotlib.pyplot as plt
plt.switch_backend("Agg")
import seaborn as sns
import numpy as np
import pandas as pd
from sklearn.decomposition import PCA
......@@ -23,6 +26,7 @@ from .models import (
update_compound_cached_properties,
LeLleBiplotData,
PcaBiplotData,
Job,
)
from .utils import smi2sdf
from .gx import GalaxyCompoundPropertiesRunner
......@@ -311,6 +315,20 @@ def validate_compounds(compound_ids):
generate_pca_plot()
@task(base=IppidbTask, bind=True)
def launch_test_command_caching(self):
import time
import random
self.update_job(std_out="Before first sleep, state={}".format(self.state))
time.sleep(30)
self.update_state(state=states.STARTED)
self.update_job(std_out="After first sleep, state={}".format(self.state))
num = random.random()
if num > 0.5:
raise Exception("ERROR: {} is greater than 0.5".format(num))
@shared_task
def launch_validate_contributions(contribution_ids):
"""
......
......@@ -2,11 +2,11 @@ from django.contrib import admin
from django.urls import path
from django.shortcuts import redirect
from django.contrib import messages
from ippidb.tasks import (
launch_compound_properties_caching,
launch_drugbank_similarity_computing,
launch_plots_computing,
launch_test_command_caching,
)
......@@ -35,9 +35,19 @@ class IppidbAdmin(admin.AdminSite):
"launch_plots_computing/",
self.admin_view(self.launch_plots_computing_view),
),
path("launch_test_command/", self.admin_view(self.launch_test_command),),
]
return my_urls + urls
def launch_test_command(self, request):
"""
This view launches the task to test jobs
"""
task = launch_test_command_caching.delay()
print(task.state)
messages.add_message(request, messages.INFO, "Test job launched")
return redirect("/admin/")
def launch_compound_properties_caching_view(self, request):
"""
This view launches the task to perform, for all already validated compounds,
......
from ippidb.models import Job
from celery import Task, states
from django_celery_results.models import TaskResult
class AlreadyExistError(Exception):
pass
class IppidbTask(Task):
"""
Ippidb custom task
"""
def __call__(self, *args, **kwargs):
"""In celery task this function call the run method, here you can
set some environment variable before the run of the task"""
tasks = TaskResult.objects.filter(
task_name=self.request.task,
status__in=[states.STARTED, states.PENDING, states.RETRY, states.RECEIVED],
)
count_tasks = tasks.count()
if not count_tasks:
self.update_state(state=states.PENDING)
return self.run(*args, **kwargs)
else:
message_exc = "Job {} in state {} already exist".format(
tasks[0].task_name, tasks[0].status
)
raise AlreadyExistError(message_exc)
def update_job(self, std_out=None, std_err=None):
job = Job.objects.get(task_result__task_id=self.task_id)
if std_out:
job.write(std_out, output="std_out")
elif std_err:
job.write(std_err, output="std_err")
def on_success(self, retval, task_id, args, kwargs):
self.update_job(std_out="SUCCESS")
super(IppidbTask, self).on_success(retval, task_id, args, kwargs)
def on_failure(self, exc, task_id, args, kwargs, einfo):
self.update_job(std_err=exc)
super(IppidbTask, self).on_failure(exc, task_id, args, kwargs, einfo)
def update_state(self, task_id=None, state=None, meta=None, **kwargs):
self.state = state
if task_id is None:
self.task_id = self.request.id
else:
self.task_id = task_id
super(IppidbTask, self).update_state(
task_id=task_id, state=state, meta=meta, **kwargs
)
......@@ -43,6 +43,7 @@ INSTALLED_APPS = [
"django.contrib.messages",
"django.contrib.staticfiles",
"django_extensions",
"django_celery_results",
"crispy_forms",
"live_settings",
"ippidb",
......@@ -182,3 +183,6 @@ MARVINJS_APIKEY = None
GALAXY_BASE_URL = None
GALAXY_APIKEY = None
GALAXY_COMPOUNDPROPERTIES_WORKFLOWID = None
# celery setting.
CELERY_RESULT_BACKEND = 'django-db'
......@@ -45,6 +45,7 @@ INSTALLED_APPS = [
"django.contrib.messages",
"django.contrib.staticfiles",
"django_extensions",
"django_celery_results",
"crispy_forms",
"live_settings",
"ippidb",
......@@ -161,3 +162,6 @@ LOGOUT_REDIRECT_URL = "/"
# django-crispy-forms
################################################################################
CRISPY_TEMPLATE_PACK = "bootstrap4"
# celery setting.
CELERY_RESULT_BACKEND = 'django-db'
......@@ -59,6 +59,12 @@
<input type="submit" value="Plots generation" name="_save"/>
</form>
<hr/>
<form method="POST" action="/admin/launch_test_command/"
style="display:block">{% csrf_token %}
<input type="submit" value="test command" name="_save"/>
</form>
<hr/>
</div>
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment