Select Git revision
views.py 42.33 KiB
# Pandas
# import pandas as pd
import json
# Django
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Group
from django.http.request import QueryDict
from django.shortcuts import get_object_or_404
from django.http import JsonResponse
from django.db import transaction, DataError
from django.db.models import ProtectedError
from django.db.utils import IntegrityError
# Guardian
from guardian.shortcuts import assign_perm, get_perms
import pandas as pd
# DRF
from rest_framework import (
views,
viewsets,
status,
serializers,
mixins,
authentication,
)
from rest_framework.permissions import IsAuthenticated
# from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.decorators import action
from rest_framework_simplejwt.authentication import JWTAuthentication
# DRF guardian
from rest_framework_guardian import filters
from .exceptions import RessourceAlreadyExists
from bioblend import ConnectionError
from bioblend.galaxy.datasets import DatasetStateException
# Serializers
from .serializers import (
ChangePasswordSerializer,
ExperimentLightSerializer,
FounderSerializer,
GroupSerializer,
HaplotypeSerializer,
LodScoreSerializer,
PeakGenesSerializer,
PeakSerializer,
ProjectGroupRoleSerializer,
ProjectGroupSerializer,
ProjectMemberSerializer,
ProjectRoleSerializer,
RoleGroupSerializer,
SnpsAssociationSerializer,
TopSnpsSerializer,
UserSerializer,
ProjectSerializer,
DepartmentSerializer,
TeamSerializer,
RoleSerializer,
CclineSerializer,
ExperimentCclineSerializer,
ExperimentSerializer,
PhenotypeCategorySerializer,
PhenotypeSerializer,
AnalysisSerializer,
CoefficientSerializer,
)
# Models
from .models import (
Coefficient,
Founder,
Haplotypes,
LodScore,
Peak,
PeakGenes,
Project,
ProjectRole,
Role,
ProjectGroup,
SnpsAssociation,
Team,
Department,
RoleGroup,
Experiment,
Ccline,
ExperimentCcline,
TopSnps,
PhenotypeCategory,
Phenotype,
Analysis,
)
from .galaxy import GalaxyRunner
from .tasks import (
load_peak_genes,
load_refine_peaks,
load_coefficient,
load_haplotypes,
load_lodscores_results,
load_snps_association,
load_top_snps,
download_formatted_qtl2_data,
)
# Permissions
from .permissions import CustomObjectPermissions, CustomPermissions, IsAdminOrIsSelf
PERMISSION_NAMES = ["add", "change", "delete", "view"]
# filters
# from .filters import ObjectAnalysisPermissionsFilter
# Settings
# Create your views here.
User = get_user_model()
runner = GalaxyRunner()
class ErrorDataResponse:
def __init__(self, type, message, status):
self.type = type
self.message = message
self.status = status
def get_error_response(self):
return Response(
{"type": self.type, "message": self.message}, status=self.status
)
galaxy_connection_error = ErrorDataResponse(
"galaxy",
f"Not able to reach {runner.galaxy_url} instance",
status.HTTP_503_SERVICE_UNAVAILABLE,
)
class AppGroupViewSet(viewsets.ModelViewSet):
queryset = Group.objects.exclude(projectgroup__isnull=False).exclude(
team__isnull=False
)
serializer_class = GroupSerializer
class UserViewSet(viewsets.ModelViewSet):
"""
API endpoint that allows users to be viewed or edited.
"""
queryset = User.objects.exclude(username__exact="AnonymousUser")
serializer_class = UserSerializer
permission_classes = [CustomPermissions]
# permission_classes = [CustomObjectPermissions]
# permission_classes = [DjangoModelPermissions]
# permission_classes = [DjangoObjectPermissions]
# filter_backends = [filters.ObjectPermissionsFilter]
def list(self, request):
current_user = User.objects.get(pk=request.user.id)
queryset = self.get_queryset()
serializer = self.get_serializer(
queryset, many=True, context={"request": request}
)
data = serializer.data
for user in data:
member_permissions = set()
user_db = User.objects.get(pk=user["id"])
for perm in get_perms(current_user, user_db):
member_permissions.add(perm)
user["permissions"] = member_permissions
return Response(data)
def get_serializer_class(self):
if self.action == "password_change":
return ChangePasswordSerializer
return self.serializer_class
@action(
detail=True,
methods=["put"],
name="Change Password",
permission_classes=[IsAdminOrIsSelf],
)
def password_change(self, request, pk=None):
"""Change the user's password."""
serializer = ChangePasswordSerializer(
data=request.data, context={"request": request}
)
if serializer.is_valid():
self.partial_update(request)
return Response("Password updated successfully")
else:
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class CurrentUserViewSet(viewsets.ViewSet):
"""
Get the id of the authenticated user
"""
http_method_names = ["get"]
queryset = User.objects.all()
serializer_class = UserSerializer
permission_classes = [CustomObjectPermissions]
filter_backends = [filters.ObjectPermissionsFilter]
def list(self, request, format=None):
user = User.objects.get(pk=request.user.id)
serializer = UserSerializer(user)
return Response(serializer.data)
class ProjectViewSet(viewsets.ModelViewSet):
"""
API endpoint that allows projects to be viewed or edited.
"""
queryset = Project.objects.all()
serializer_class = ProjectSerializer
permission_classes = [CustomObjectPermissions]
filter_backends = [filters.ObjectPermissionsFilter]
@action(detail=False)
def count(self, request):
queryset = self.filter_queryset(self.get_queryset())
return Response({"count": queryset.count()})
@action(detail=True)
def permissions(self, request, pk=None):
current_user = User.objects.get(pk=request.user.id)
permissions_set = set()
permissions_set.update(get_perms(current_user, Project.objects.get(pk=pk)))
for project_group in ProjectGroup.objects.filter(project=pk):
permissions_set.update(get_perms(current_user, project_group))
return Response(permissions_set)
def perform_destroy(self, instance):
gprs = ProjectGroup.objects.filter(project=instance.id)
for gpr in gprs:
group = gpr.group_ptr
group_id = group.id
group = Group.objects.get(pk=group_id)
group.delete()
instance.delete()
class ProjectMemberViewSet(
# viewsets.ModelViewSet
# mixins.UpdateModelMixin,
# mixins.ListModelMixin,
# viewsets.GenericViewSet,
viewsets.ReadOnlyModelViewSet
):
serializer_class = ProjectMemberSerializer
def get_queryset(self):
project_group_users = User.objects.filter(
groups__projectgroup__project__id=self.kwargs["project_pk"]
)
teams_group_users = User.objects.filter(
groups__team__projects__id=self.kwargs["project_pk"],
)
return project_group_users.union(teams_group_users)
def list(self, request, project_pk=None):
serializer = self.get_serializer(
self.get_queryset(),
many=True,
context={"request": request, "project_pk": int(project_pk)},
)
data = serializer.data
for member in data:
member_permissions = set()
current_user = User.objects.get(pk=member["id"])
for perm in get_perms(current_user, Project.objects.get(pk=project_pk)):
member_permissions.add(perm)
# get all project groups
project_groups = ProjectGroup.objects.filter(project=project_pk)
for pg in project_groups:
for perm in get_perms(current_user, pg):
member_permissions.add(perm)
member["permissions"] = member_permissions
return Response(data)
def update(self, request, *args, **kwargs):
partial = kwargs.pop("partial", False)
# instance = self.get_object()
instance = User.objects.get(pk=kwargs.get("pk"))
serializer = self.get_serializer(instance, data=request.data, partial=partial)
serializer.is_valid(raise_exception=True)
self.perform_update(serializer)
if getattr(instance, "_prefetched_objects_cache", None):
# If 'prefetch_related' has been applied to a queryset, we need to
# forcibly invalidate the prefetch cache on the instance.
instance._prefetched_objects_cache = {}
return Response(serializer.data)
class CclineViewSet(viewsets.ReadOnlyModelViewSet):
serializer_class = CclineSerializer
queryset = Ccline.objects.all()
class FounderViewSet(viewsets.ReadOnlyModelViewSet):
serializer_class = FounderSerializer
queryset = Founder.objects.all()
class ExperimentViewSet(viewsets.ModelViewSet):
serializer_class = ExperimentLightSerializer
queryset = Experiment.objects.all()
permission_classes = [CustomObjectPermissions]
filter_backends = [filters.ObjectPermissionsFilter]
def list(self, request, *args, **kwargs):
queryset = self.filter_queryset(self.get_queryset())
page = self.paginate_queryset(queryset)
if page is not None:
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
@action(detail=False)
def count(self, request):
queryset = self.filter_queryset(self.get_queryset())
return Response({"count": queryset.count()})
class ProjectExperimentViewSet(viewsets.ModelViewSet):
"""
API endpoint that allows experiment to be viewed or edited.
"""
serializer_class = ExperimentSerializer
queryset = Experiment.objects.all()
permission_classes = [CustomObjectPermissions]
filter_backends = [filters.ObjectPermissionsFilter]
def addExperimentPhenotype(self, experiment, experiment_cc_line, metadata, value):
phenotype_category = PhenotypeCategory.objects.get(id=metadata.get("id"))
Phenotype.objects.create(
category=phenotype_category,
experiment_cc_line=experiment_cc_line,
value=value,
)
experiment.phenotype_categories.add(phenotype_category)
def create(self, request, *args, **kwargs):
raw_experiment_metadata = request.data.pop("experiment-metadata")
up_file = request.data.pop("file")
project = Project.objects.get(pk=kwargs["project_pk"])
project_serializer = ProjectSerializer(project, context={"request": request})
request.data["project"] = project_serializer.data["url"]
# raw_experiment_metadata = self.initial_data["experiment-metadata"]
experiment_metadata = json.loads(raw_experiment_metadata[0])
column_exp_metadata = {md["colIndex"]: md for md in experiment_metadata}
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
experiment_df = pd.read_csv(up_file[0])
phenotype_index_start = 4
phenotype_columns = experiment_df.columns[phenotype_index_start:]
try:
with transaction.atomic():
self.perform_create(serializer)
experiment = serializer.instance
for _, row in experiment_df.iterrows():
cc_line_m = Ccline.objects.get(id_cc=row["Mother_Line"])
cc_line_f = Ccline.objects.get(id_cc=row["Father_Line"])
experiment_cc_line = ExperimentCcline.objects.create(
experiment=experiment,
cc_line_id=row["Individual_ID"],
cc_line_m=cc_line_m,
cc_line_f=cc_line_f,
sex=row["Sex"],
)
for idx, phenotype_col in enumerate(phenotype_columns):
column_metadata = column_exp_metadata[
idx + phenotype_index_start
]
self.addExperimentPhenotype(
experiment,
experiment_cc_line,
column_metadata,
row[phenotype_col],
)
project_roles = ProjectRole.objects.all()
for project_role in project_roles:
project_group = ProjectGroup.objects.get(
project=project, project_role=project_role,
)
permissions = project_role.permissions.filter(
content_type__model="experiment"
)
for perm in permissions:
assign_perm(perm, project_group)
assign_perm(perm, project_group, experiment)
except Ccline.DoesNotExist:
serializer.instance.delete()
raise serializers.ValidationError("Ccline does not exist.")
except PhenotypeCategory.DoesNotExist:
serializer.instance.delete()
raise serializers.ValidationError("Phenotype category does not exist.")
except IntegrityError as e:
# if "experiement_cc_line_unique" in e:
# raise serializers.ValidationError(
# "Duplicate ccline_id in the experiment file"
# )
# else if
raise serializers.ValidationError(e)
except DataError as e:
raise serializers.ValidationError(e)
headers = self.get_success_headers(serializer.data)
return Response(
serializer.data, status=status.HTTP_201_CREATED, headers=headers
)
# def get_object(self):
# obj = get_object_or_404(
# self.get_queryset(), pk=self.kwargs["pk"], project=self.kwargs["project_pk"]
# )
# self.check_object_permissions(self.request, obj)
# return obj
def list(self, request, project_pk=None):
queryset = Experiment.objects.filter(project=project_pk)
serializer = ExperimentSerializer(
queryset, many=True, context={"request": request}
)
return Response(serializer.data)
def retrieve(self, request, pk=None, project_pk=None):
queryset = Experiment.objects.filter(pk=pk, project=project_pk)
experiment = get_object_or_404(queryset, pk=pk)
serializer = ExperimentSerializer(experiment, context={"request": request})
return Response(serializer.data)
@action(detail=True)
def permissions(self, request, pk=None, project_pk=None):
current_user = User.objects.get(pk=request.user.id)
permissions = get_perms(
current_user, Experiment.objects.get(pk=pk, project=project_pk)
)
return Response(permissions)
@action(detail=True)
def phenotypes(self, request, pk=None, project_pk=None):
return Response(self.get_object().get_experiment_data())
@action(detail=True)
def phenotype_types(self, request, pk=None, project_pk=None):
return Response(self.get_object().get_experiment_phenotype_types())
class ExperimentCclineViewSet(viewsets.ModelViewSet):
serializer_class = ExperimentCclineSerializer
queryset = ExperimentCcline.objects.all()
def list(self, request, project_pk=None, experiment_pk=None):
queryset = ExperimentCcline.objects.filter(
experiment__project=project_pk, experiment=experiment_pk
)
serializer = ExperimentCclineSerializer(
queryset, many=True, context={"request": request}
)
return Response(serializer.data)
def retrieve(self, request, pk=None, project_pk=None, experiment_pk=None):
queryset = ExperimentCcline.objects.filter(
pk=pk, experiment__project=project_pk, experiment=experiment_pk
)
experiment_ccline = get_object_or_404(queryset, pk=pk)
serializer = ExperimentCclineSerializer(
experiment_ccline, context={"request": request}
)
return Response(serializer.data)
class PhenotypeCategoryResolveViewSet(
mixins.CreateModelMixin,
# mixins.ListModelMixin,
# mixins.RetrieveModelMixin,
viewsets.GenericViewSet,
):
queryset = PhenotypeCategory.objects.all()
serializer_class = PhenotypeCategorySerializer
permission_classes = [IsAuthenticated]
def create(self, request):
response = []
for phenotypeCategory in request.data:
try:
var_cat_qs = self.get_queryset().get(
name=phenotypeCategory["name"],
# nature=phenotypeCategory["nature"],
# location=phenotypeCategory["location"],
# datatype=phenotypeCategory["datatype"],
# dataclass=phenotypeCategory["dataclass"],
)
serializer = self.get_serializer(var_cat_qs)
responseData = serializer.data
responseData["colid"] = phenotypeCategory["colid"]
response.append(responseData)
except Exception:
response.append({"colid": phenotypeCategory["colid"]})
return Response(response)
class PhenotypeCategoryViewSet(viewsets.ModelViewSet):
queryset = PhenotypeCategory.objects.all()
serializer_class = PhenotypeCategorySerializer
permission_classes = [CustomPermissions]
def perform_destroy(self, instance):
try:
instance.delete()
except ProtectedError as e:
print(e)
raise serializers.ValidationError(
f"The phenotype category {instance.name} is apply on phenotypes. You cannot delete it !!"
)
@action(detail=False)
def permissions(self, request, pk=None):
current_user = request.user
permissions_set = set(
[
perm
for perm in PERMISSION_NAMES
if current_user.has_perm(f"api.{perm}_phenotypecategory")
]
)
return Response(permissions_set)
class PhenotypeViewSet(viewsets.ModelViewSet):
queryset = Phenotype.objects.all()
serializer_class = PhenotypeSerializer
def get_queryset(self):
return Phenotype.objects.filter(
project=self.kwargs["project_pk"], experiment=self.kwargs["experiment_pk"]
)
class TeamViewSet(viewsets.ModelViewSet):
queryset = Team.objects.all()
serializer_class = TeamSerializer
# permission_classes = [CustomObjectPermissions]
# filter_backends = [filters.ObjectPermissionsFilter]
class DepartmentViewSet(viewsets.ModelViewSet):
queryset = Department.objects.all()
serializer_class = DepartmentSerializer
permission_classes = [CustomObjectPermissions]
filter_backends = [filters.ObjectPermissionsFilter]
class ProjectRoleViewSet(viewsets.ModelViewSet):
queryset = ProjectRole.objects.all()
serializer_class = ProjectRoleSerializer
class ProjectGroupRoleViewSet(viewsets.ModelViewSet):
queryset = ProjectGroup.objects.all()
serializer_class = ProjectGroupRoleSerializer
permission_classes = [CustomObjectPermissions]
filter_backends = [filters.ObjectPermissionsFilter]
class ProjectGroupViewSet(viewsets.ModelViewSet):
# queryset = ProjectGroup.objects.all()
serializer_class = ProjectGroupSerializer
permission_classes = [CustomObjectPermissions]
filter_backends = [filters.ObjectPermissionsFilter]
def get_queryset(self):
return ProjectGroup.objects.filter(project=self.kwargs["project_pk"])
def get_object(self):
obj = get_object_or_404(self.get_queryset(), pk=self.kwargs["pk"])
self.check_object_permissions(self.request, obj)
return obj
def list(self, request, project_pk=None):
# perm_queryset = get_objects_for_user(request.user, "api.view_analysis")
queryset = ProjectGroup.objects.filter(project=project_pk)
serializer = self.get_serializer(
queryset, many=True, context={"request": request}
)
return Response(serializer.data)
def retrieve(self, request, pk=None, project_pk=None):
# perm_queryset = get_objects_for_user(request.user, "api.view_analysis")
queryset = ProjectGroup.objects.filter(pk=pk, project=project_pk)
instance = get_object_or_404(queryset, pk=pk)
serializer = self.get_serializer(instance, context={"request": request})
data = serializer.data
return Response(data)
def update(self, request, *args, **kwargs):
partial = kwargs.pop("partial", False)
instance = self.get_object()
if "users" in request.data:
with transaction.atomic():
if type(request.data) is QueryDict:
req_users = request.data.getlist("users")
else:
req_users = request.data["users"]
users = User.objects.filter(id__in=req_users)
# Remove users in all projectgroup that match the project
# to ensuer a user can be in only one projectgroup related
# to this project
project_groups = ProjectGroup.objects.filter(project=instance.project)
for u in users:
for pg in project_groups:
pg.user_set.remove(u)
users_to_add = [user.id for user in users]
# users_to_add = validated_data.pop("user_set")
instance.user_set.add(*users_to_add)
if "userToRemove" in request.data:
with transaction.atomic():
# Remove users from project group
req_user_to_rm = request.data.get("userToRemove")
user_to_remove = User.objects.get(id=req_user_to_rm)
instance.user_set.remove(user_to_remove)
serializer = self.get_serializer(instance, data=request.data, partial=partial)
serializer.is_valid(raise_exception=True)
self.perform_update(serializer)
if getattr(instance, "_prefetched_objects_cache", None):
# If 'prefetch_related' has been applied to a queryset, we need to
# forcibly invalidate the prefetch cache on the instance.
instance._prefetched_objects_cache = {}
return Response(serializer.data)
class RoleViewSet(viewsets.ReadOnlyModelViewSet):
queryset = Role.objects.all()
serializer_class = RoleSerializer
# permission_classes = [CustomObjectPermissions]
# filter_backends = [filters.ObjectPermissionsFilter]
class RoleGroupViewSet(viewsets.ModelViewSet):
queryset = RoleGroup.objects.all()
serializer_class = RoleGroupSerializer
class PeakViewSet(viewsets.ReadOnlyModelViewSet):
serializer_class = PeakSerializer
queryset = Peak.objects.all()
permission_classes = [CustomObjectPermissions]
filter_backends = [filters.ObjectPermissionsFilter]
def list(self, request, project_pk=None, analysis_pk=None):
queryset = self.peaks_per_analysis_queryset(project_pk, analysis_pk)
if queryset.count() == 0:
# Empty queryset,
# try to load dataset from Galaxy
analysis_db = Analysis.objects.get(id=analysis_pk)
try:
load_refine_peaks.apply_async(
args=[(analysis_pk, analysis_db.galaxy_history_id, None)]
).get()
except ConnectionError:
return galaxy_connection_error.get_error_response()
except DatasetStateException as e:
return Response(str(e), status=status.HTTP_404_NOT_FOUND)
except Exception as e:
return Response(str(e), status=status.HTTP_400_BAD_REQUEST)
else:
if queryset.count() == 0:
return Response("No peak found", status=status.HTTP_404_NOT_FOUND)
else:
# queryset = self.peaks_per_analysis_queryset(project_pk, analysis_pk)
return self.get_serialized_response(queryset, request)
return self.get_serialized_response(queryset, request)
def retrieve(self, request, pk=None, analysis_pk=None, project_pk=None):
queryset = Peak.objects.filter(
pk=pk, analysis=analysis_pk, analysis__project=project_pk
)
peak = get_object_or_404(queryset, pk=pk)
serializer = PeakSerializer(peak, context={"request": request})
return Response(serializer.data)
def get_serialized_response(self, queryset, request):
serializer = PeakSerializer(queryset, many=True, context={"request": request})
return Response(serializer.data)
def peaks_per_analysis_queryset(self, project_pk, analysis_pk):
return Peak.objects.filter(analysis__project=project_pk, analysis=analysis_pk)
@action(detail=True)
def coefficients(self, request, pk=None, project_pk=None, analysis_pk=None):
queryset = Coefficient.objects.filter(peak=pk)
if queryset.count() == 0:
analysis_db = Analysis.objects.get(id=analysis_pk)
ids = (analysis_pk, analysis_db.galaxy_history_id, None)
try:
load_coefficient.apply_async(args=[ids]).get()
except ConnectionError:
return galaxy_connection_error.get_error_response()
except DatasetStateException as e:
return Response(str(e), status=status.HTTP_404_NOT_FOUND)
except Exception as e:
return Response(str(e), status=status.HTTP_400_BAD_REQUEST)
else:
if queryset.count() == 0:
return Response(
"No coefficient found", status=status.HTTP_404_NOT_FOUND
)
# else:
# # queryset = Coefficient.objects.filter(peak=pk)
# serializer = CoefficientSerializer(queryset.first())
# return Response(serializer.data)
serializer = CoefficientSerializer(queryset.first())
return Response(serializer.data)
@action(detail=True)
def snps_association(self, request, pk=None, project_pk=None, analysis_pk=None):
# load_snps_association
queryset = SnpsAssociation.objects.filter(peak=pk)
if queryset.count() == 0:
analysis_db = Analysis.objects.get(id=analysis_pk)
ids = (analysis_pk, analysis_db.galaxy_history_id, None)
try:
load_snps_association.apply_async(args=[ids]).get()
except ConnectionError:
return galaxy_connection_error.get_error_response()
except DatasetStateException as e:
return Response(str(e), status=status.HTTP_404_NOT_FOUND)
except Exception as e:
return Response(str(e), status=status.HTTP_400_BAD_REQUEST)
else:
if queryset.count() == 0:
return Response(
"No snps association found", status=status.HTTP_404_NOT_FOUND
)
# queryset = SnpsAssociation.objects.filter(peak=pk)
snps = SnpsAssociationSerializer(queryset.first())
return Response(snps.data)
@action(detail=True)
def top_snps(self, request, pk=None, project_pk=None, analysis_pk=None):
queryset = TopSnps.objects.filter(peak=pk)
if queryset.count() == 0:
analysis_db = Analysis.objects.get(id=analysis_pk)
ids = (analysis_pk, analysis_db.galaxy_history_id, None)
try:
load_top_snps.apply_async(args=[ids]).get()
except ConnectionError:
return galaxy_connection_error.get_error_response()
except DatasetStateException as e:
return Response(str(e), status=status.HTTP_404_NOT_FOUND)
except Exception as e:
return Response(str(e), status=status.HTTP_400_BAD_REQUEST)
else:
if queryset.count() == 0:
return Response(
"No top snps found", status=status.HTTP_404_NOT_FOUND
)
snps = TopSnpsSerializer(queryset.first())
return Response(snps.data)
@action(detail=True)
def genes(self, request, pk=None, project_pk=None, analysis_pk=None):
queryset = PeakGenes.objects.filter(peak=pk)
if queryset.count() == 0:
analysis_db = Analysis.objects.get(id=analysis_pk)
ids = (analysis_pk, analysis_db.galaxy_history_id, None)
try:
load_peak_genes.apply_async(args=[ids]).get()
except ConnectionError:
return galaxy_connection_error.get_error_response()
except DatasetStateException as e:
return Response(str(e), status=status.HTTP_404_NOT_FOUND)
except Exception as e:
return Response(str(e), status=status.HTTP_400_BAD_REQUEST)
else:
if queryset.count() == 0:
return Response(
"No genes list found for peak", status=status.HTTP_404_NOT_FOUND
)
genes = PeakGenesSerializer(queryset.first())
return Response(genes.data)
@action(detail=True)
def haplotypes(self, request, pk=None, project_pk=None, analysis_pk=None):
# load_haplotypes
queryset = Haplotypes.objects.filter(peak=pk)
if queryset.count() == 0:
analysis_db = Analysis.objects.get(id=analysis_pk)
ids = (analysis_pk, analysis_db.galaxy_history_id, None)
try:
load_haplotypes.apply_async(args=[ids]).get()
except ConnectionError:
return galaxy_connection_error.get_error_response()
except DatasetStateException as e:
return Response(str(e), status=status.HTTP_404_NOT_FOUND)
except Exception as e:
return Response(str(e), status=status.HTTP_400_BAD_REQUEST)
else:
if queryset.count() == 0:
return Response("No peak found", status=status.HTTP_404_NOT_FOUND)
# else:
# queryset = Haplotypes.objects.filter(peak=pk)
serializer = HaplotypeSerializer(queryset.first())
return Response(serializer.data)
class LodScoreViewSet(viewsets.ModelViewSet):
serializer_class = LodScoreSerializer
queryset = LodScore.objects.all()
permission_classes = [CustomObjectPermissions]
filter_backends = [filters.ObjectPermissionsFilter]
def list(self, request, project_pk=None, analysis_pk=None):
queryset = LodScore.objects.filter(
analysis__project=project_pk, analysis=analysis_pk
)
serializer = LodScoreSerializer(
queryset, many=True, context={"request": request}
)
return Response(serializer.data)
def retrieve(self, request, pk=None, analysis_pk=None, project_pk=None):
queryset = LodScore.objects.filter(
pk=pk, analysis=analysis_pk, analysis__project=project_pk
)
records = get_object_or_404(queryset, pk=pk)
serializer = LodScoreSerializer(records, context={"request": request})
return Response(serializer.data)
class AnalysisViewSet(viewsets.ModelViewSet):
serializer_class = AnalysisSerializer
queryset = Analysis.objects.all()
permission_classes = [CustomObjectPermissions]
filter_backends = [filters.ObjectPermissionsFilter]
def list(self, request, *args, **kwargs):
queryset = self.filter_queryset(self.get_queryset())
page = self.paginate_queryset(queryset)
if page is not None:
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
@action(detail=False)
def count(self, request):
queryset = self.filter_queryset(self.get_queryset())
return Response({"count": queryset.count()})
class ProjectAnalysisViewSet(viewsets.ModelViewSet):
serializer_class = AnalysisSerializer
queryset = Analysis.objects.all()
permission_classes = [CustomObjectPermissions]
filter_backends = [filters.ObjectPermissionsFilter]
def list(self, request, project_pk=None):
# perm_queryset = get_objects_for_user(request.user, "api.view_analysis")
queryset = Analysis.objects.filter(project=project_pk)
serializer = AnalysisSerializer(
queryset, many=True, context={"request": request}
)
return Response(serializer.data)
def retrieve(self, request, pk=None, project_pk=None):
# perm_queryset = get_objects_for_user(request.user, "api.view_analysis")
queryset = Analysis.objects.filter(pk=pk, project=project_pk)
analysis = get_object_or_404(queryset, pk=pk)
serializer = AnalysisSerializer(analysis, context={"request": request})
return Response(serializer.data)
def destroy(self, request, *args, **kwargs):
instance = self.get_object()
try:
runner.delete_history(instance.galaxy_history_id)
self.perform_destroy(instance)
return Response(status=status.HTTP_204_NO_CONTENT)
except ConnectionError:
return galaxy_connection_error.get_error_response()
def perform_destroy(self, instance):
instance.delete()
@action(detail=False)
def permissions(self, request, pk=None, project_pk=None):
current_user = request.user
return Response(
[
perm.split(".")[1].split("_")[0]
for perm in current_user.get_all_permissions()
if perm.endswith("analysis")
]
)
@action(detail=True)
def lodscores(self, request, pk=None, project_pk=None):
# Get the lodscores
queryset = LodScore.objects.filter(analysis=pk)
# If no load score in databse
if queryset.count() == 0:
analysis_db = Analysis.objects.get(id=pk)
ids = (pk, analysis_db.galaxy_history_id, None)
try:
load_lodscores_results(ids).apply_async().get()
except ConnectionError:
return galaxy_connection_error.get_error_response()
except DatasetStateException as e:
exception_msg = str(e)
if "significance-threshold.csv" in exception_msg:
print("No significance threshold")
pass
else:
return Response(str(e), status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
return Response(str(e), status=status.HTTP_400_BAD_REQUEST)
else:
if queryset.count() == 0:
return Response(
"No load score found", status=status.HTTP_404_NOT_FOUND
)
# else:
# queryset = LodScore.objects.filter(analysis=pk)
lodscores = LodScoreSerializer(queryset.first())
return Response(lodscores.data)
@action(detail=True)
def haslodscores(self, request, pk=None, project_pk=None):
lodscores_count = LodScore.objects.filter(analysis=pk).count()
if lodscores_count > 0:
return Response({"has_lodscores": True})
else:
return Response({"has_lodscores": False})
@action(detail=True)
def download_qtl2_data(self, request, pk=None, project_pk=None):
queryset = Analysis.objects.get(id=pk)
# download_formatted_qtl2_data()
try:
result = download_formatted_qtl2_data.apply_async(
args=[queryset.galaxy_history_id]
).get()
except ConnectionError:
return galaxy_connection_error.get_error_response()
except DatasetStateException as e:
return Response(str(e), status=status.HTTP_404_NOT_FOUND)
except Exception as e:
return Response(str(e), status=status.HTTP_400_BAD_REQUEST)
else:
return Response(
{"archive_name": result, "project_id": project_pk, "analysis_id": pk}
)
@action(detail=True)
def workflow_invocation_parameters(self, request, pk=None, project_pk=None):
if self.get_object().galaxy_workflow_invocation_id:
try:
wf_invocation = self.get_object().get_workflow_invocation()
wf_invocation["steps"].sort(key=lambda step: step["step_id"])
return Response(wf_invocation)
except ConnectionError:
return galaxy_connection_error.get_error_response()
else:
return Response(None)
@action(detail=True)
def workflow_job_parameters(self, request, pk=None, project_pk=None):
return Response(self.get_object().get_workflow_job_parameters())
# class ToolsParameters(views.APIView):
# authentication_classes = [JWTAuthentication, authentication.SessionAuthentication]
# permission_classes = [IsAuthenticated]
# def get(self, request, *args, **kwargs):
# wf_id = kwargs["wf_id"]
# runner = GalaxyRunner()
# gi = runner.galaxy_instance
# try:
# wf = gi.workflows.show_workflow(wf_id)
# wf_invocation = gi.workflows.get_invocations(wf["id"])[0]
# invocation_id = wf_invocation["id"]
# invocation_details = gi.workflows.show_invocation(wf["id"], invocation_id)
# invocation_step = gi.workflows.show_invocation_step(
# wf["id"], invocation_id, invocation_details["steps"][4]["id"]
# )
# job = gi.jobs.show_job(invocation_step["job_id"])
# return Response(job)
# except ConnectionError as e:
# return Response(
# status=status.HTTP_400_BAD_REQUEST, data=json.loads(e.body)["err_msg"]
# )
# def tools_parameters(request):
# runner = GalaxyRunner()
# gi = runner.galaxy_instance
# wf = gi.workflows.get_workflows(name="cc-qtl-wf")[0]
# wf_invocation = gi.workflows.get_invocations(wf["id"])[0]
# invocation_id = wf_invocation["id"]
# invocation_details = gi.workflows.show_invocation(wf["id"], invocation_id)
# invocation_step = gi.workflows.show_invocation_step(
# wf["id"], invocation_id, invocation_details["steps"][4]["id"]
# )
# job = gi.jobs.show_job(invocation_step["job_id"])
# return JsonResponse(job, safe=False)
class WorkflowTools(views.APIView):
authentication_classes = [JWTAuthentication, authentication.SessionAuthentication]
permission_classes = [IsAuthenticated]
def get(self, request, *args, **kwargs):
wf_id = kwargs["wf_id"]
# runner = GalaxyRunner()
gi = runner.galaxy_instance
# TODO worflow_id should be a parameter
# workflow_id = request["workflow_id"]
try:
wf_with_details = gi.workflows.show_workflow(wf_id)
wf_step_tool_ids = [
(step_id, step["tool_id"])
for step_id, step in wf_with_details["steps"].items()
]
return Response(dict(get_job_params(wf_step_tool_ids, gi)))
except ConnectionError:
return galaxy_connection_error.get_error_response()
class WorkflowDefaultParameters(views.APIView):
authentication_classes = [JWTAuthentication, authentication.SessionAuthentication]
permission_classes = [IsAuthenticated]
def get(self, request, *args, **kwargs):
wf_id = kwargs["wf_id"]
# runner = GalaxyRunner()
gi = runner.galaxy_instance
try:
wf = gi.workflows.show_workflow(wf_id)
wf_steps_inputs = dict(
[
(
step_id,
dict(
[
(input_id, input_val)
for input_id, input_val in step["tool_inputs"].items()
if input_val is not None and type(input_val) is not dict
]
),
)
for step_id, step in wf["steps"].items()
if step_id != "0"
]
)
return Response(wf_steps_inputs)
except ConnectionError:
return galaxy_connection_error.get_error_response()
def workflow_default_parameters(request):
# runner = GalaxyRunner()
try:
gi = runner.galaxy_instance
wf = gi.workflows.get_workflows(name="cc-qtl-wf")[0]
wf_id = wf["id"]
wf = gi.workflows.show_workflow(wf_id)
wf_steps_inputs = dict(
[
(
step_id,
dict(
[
(input_id, input_val)
for input_id, input_val in step["tool_inputs"].items()
if input_val is not None and type(input_val) is not dict
]
),
)
for step_id, step in wf["steps"].items()
if step_id != "0"
]
)
return JsonResponse(wf_steps_inputs)
except ConnectionError:
return galaxy_connection_error.get_error_response()
def get_job_params(job_ids, gi):
for step_id, job_id in job_ids:
if step_id != "0":
yield (
step_id,
gi.tools.show_tool(job_id, io_details=True),
)
class CcQtlWorkflows(views.APIView):
authentication_classes = [JWTAuthentication, authentication.SessionAuthentication]
permission_classes = [IsAuthenticated]
def get(self, request, *args, **kwargs):
# runner = GalaxyRunner()
gi = runner.galaxy_instance
try:
workflows = [
wf for wf in gi.workflows.get_workflows() if wf["name"] == "cc-qtl-wf"
]
for w in workflows:
import pprint
pprint.pprint(w)
w["details"] = gi.workflows.show_workflow(w["id"])
return Response(workflows)
except ConnectionError:
return galaxy_connection_error.get_error_response()