Skip to content
Snippets Groups Projects

Update operation on NCBI taxonomy

Merged Kenzo-Hugo Hillion requested to merge 16-hierarchy-ncbi into dev
Files
30
from celery.result import AsyncResult
from fastapi import APIRouter
from fastapi import APIRouter, status
from fastapi.responses import JSONResponse
@@ -7,39 +7,90 @@ from app.celery.tasks.kegg import (
create_or_update_all_ko,
CREATE_KEGG_TASK_NAME,
)
from app.celery.tasks.ncbi_taxonomy import (
create_or_update_all_ncbi_tax,
generate_ncbi_tax_hierarchy,
CREATE_NCBI_TAX_TASK_NAME,
GENERATE_HIERARCHY_TASK_NAME,
)
from app.celery.utils import (
is_running_task,
get_task_id,
)
from app.db import reinit_db
from app.core.schemas.tasks.base import CreateTask
from app.core.schemas.tasks.db import TaskDbOperation
router = APIRouter()
@router.post("/create_kegg_ko")
@router.delete("/reinit_db")
def reinitialize_db():
reinit_db()
return JSONResponse({"empty": True}, status_code=201)
@router.post(
"/create_kegg_ko", response_model=CreateTask, status_code=status.HTTP_201_CREATED
)
def create_kegg_ko():
task_name = CREATE_KEGG_TASK_NAME
if is_running_task(task_name):
task_id = get_task_id(task_name)
return JSONResponse(
{"status": "Process already running", "task_id": task_id}, status_code=200
)
task_obj = CreateTask(task_status="STARTED", task_id=task_id)
return JSONResponse(status_code=status.HTTP_200_OK, content=task_obj.dict())
task = create_or_update_all_ko.delay()
return JSONResponse({"task_id": task.id}, status_code=201)
task_obj = CreateTask(task_status="PENDING", task_id=task.id)
return task_obj
@router.get("/tasks/{task_id}")
@router.post(
"/create_ncbi_taxonomy",
response_model=CreateTask,
status_code=status.HTTP_201_CREATED,
)
def create_ncbi_taxonomy():
task_name = CREATE_NCBI_TAX_TASK_NAME
if is_running_task(task_name):
task_id = get_task_id(task_name)
task_obj = CreateTask(task_status="STARTED", task_id=task_id)
return JSONResponse(status_code=status.HTTP_200_OK, content=task_obj.dict())
task = create_or_update_all_ncbi_tax.delay()
task_obj = CreateTask(task_status="PENDING", task_id=task.id)
return task_obj
@router.post(
"/ncbi_taxonomy_hierarchy",
response_model=CreateTask,
status_code=status.HTTP_201_CREATED,
)
def generate_ncbi_taxonomy_hierarchy():
task_name = GENERATE_HIERARCHY_TASK_NAME
if is_running_task(task_name):
task_id = get_task_id(task_name)
task_obj = CreateTask(task_status="STARTED", task_id=task_id)
return JSONResponse(status_code=status.HTTP_200_OK, content=task_obj.dict())
task = generate_ncbi_tax_hierarchy.delay()
task_obj = CreateTask(task_status="PENDING", task_id=task.id)
return task_obj
@router.get(
"/tasks/{task_id}", response_model=TaskDbOperation, status_code=status.HTTP_200_OK
)
def get_status(task_id):
task_result = AsyncResult(task_id)
if task_result.failed():
result = {
"task_id": task_id,
"task_status": "failed",
"task_status": task_result.status,
}
return JSONResponse(result)
result = {
"task_id": task_id,
"task_status": task_result.status,
"task_result": task_result.result,
}
return JSONResponse(result)
result = TaskDbOperation(
task_id=task_id,
task_status=task_result.status,
task_result=task_result.result,
)
return result
Loading