diff --git a/server/api/tasks.py b/server/api/tasks.py index 027f6cd8ae2c3071eab5d06ae89c86145ffc604a..937eefdc83e881b1f4b5855182eb0df5b8dd05c7 100644 --- a/server/api/tasks.py +++ b/server/api/tasks.py @@ -26,15 +26,24 @@ runner = GalaxyRunner() gi = runner.galaxy_instance -def run_analyis(project_name, analysis_name, experiment_file_path, tools_params, wf_id): - current_history = gi.histories.create_history(f"{project_name} - {analysis_name}") - history_id = current_history["id"] +def run_analyis( + project_name, analysis_name, experiment_file_path, tools_params, wf_id, analysis_id +): + # current_history = gi.histories.create_history() + # history_id = current_history["id"] chained_tasks = chain( - upload_file_to_galaxy.si(experiment_file_path, history_id, "excel.xls",), - genome_scan_wf.s(history_id, tools_params, wf_id), + create_history.si(f"{project_name} - {analysis_name}", analysis_id), + upload_file_to_galaxy.s(experiment_file_path, "excel.xls",), + genome_scan_wf.s(tools_params, wf_id), + load_lod_score.s(), + load_significance_threshold.s(), + load_refine_peaks.s(), + load_coefficient.s(), + load_snps_association.s(), + load_top_snps.s(), + load_haplotypes.s(), ) - res = chained_tasks.delay() - return res.get() + return chained_tasks.delay() def load_results(history_id, analysis_name): @@ -50,16 +59,23 @@ def load_results(history_id, analysis_name): return chained_tasks.delay() -def load_lodscores_results(analysis_name, history_id): - chained_tasks = chain( - load_lod_score.si(analysis_name, history_id), - load_significance_threshold.si(analysis_name, history_id), - ) +def load_lodscores_results(ids): + chained_tasks = chain(load_lod_score.si(ids), load_significance_threshold.si(ids)) return chained_tasks.delay().get() @celery_app.task() -def upload_file_to_galaxy(file_path, history_id, file_type): +def create_history(name, analysis_id): + current_history = gi.histories.create_history(name) + analysis = Analysis.objects.get(pk=analysis_id) + analysis.galaxy_history_id = current_history["id"] + analysis.save() + return analysis_id, current_history["id"] + + +@celery_app.task() +def upload_file_to_galaxy(ids, file_path, file_type): + analysis_id, history_id = ids # gi = GalaxyInstance(url=url, key=key, verify=False) upload_response = gi.tools.upload_file(file_path, history_id, file_type=file_type) @@ -84,19 +100,15 @@ def upload_file_to_galaxy(file_path, history_id, file_type): f"{data['name']}, id : {upload_data_id}, " f"error : {data['misc_info']}" ) - return upload_data_id + shutil.rmtree(Path(file_path).parent) + return (analysis_id, history_id, upload_data_id) @celery_app.task() -def genome_scan_wf(upload_data_id, history_id, tools_params, wf_id): - # gi = GalaxyInstance(url=url, key=key, verify=False) - # workflows = gi.workflows.get_workflows() - - # wf_name = "cc-qtl-wf-v2" - # print(workflows) +def genome_scan_wf(ids, tools_params, wf_id): + analysis_id, history_id, upload_data_id = ids try: genome_scan_wf_metadata = gi.workflows.show_workflow(wf_id) - # genome_scan_wf_metadata = next(wf for wf in workflows if wf["name"] == wf_name) datamap = dict() datamap["0"] = {"src": "hda", "id": upload_data_id} workflow_invocation = gi.workflows.invoke_workflow( @@ -106,17 +118,21 @@ def genome_scan_wf(upload_data_id, history_id, tools_params, wf_id): history_id=history_id, ) workflow_job_id = workflow_invocation["id"] - while workflow_invocation["state"] not in ["ok", "scheduled"]: - time.sleep(10) - workflow_invocation = gi.workflows.show_invocation( - genome_scan_wf_metadata["id"], workflow_job_id - ) - if workflow_invocation["state"] in ["error", "deleted", "discarded"]: - raise Exception( - f"Error during Galaxy workflow job - name : " - f"id : {workflow_job_id}, " - ) - return workflow_invocation + analysis = Analysis.objects.get(pk=analysis_id) + analysis.galaxy_workflow_invocation_id = workflow_job_id + analysis.save() + # gi.invocations.wait_for_invocation(workflow_job_id) + # while workflow_invocation["state"] not in ["ok", "scheduled"]: + # time.sleep(10) + # workflow_invocation = gi.workflows.show_invocation( + # genome_scan_wf_metadata["id"], workflow_job_id + # ) + # if workflow_invocation["state"] in ["error", "deleted", "discarded"]: + # raise Exception( + # f"Error during Galaxy workflow job - name : " + # f"id : {workflow_job_id}, " + # ) + return analysis_id, history_id except StopIteration: raise Exception(f"Do not find the workflow : {genome_scan_wf_metadata['name']}") @@ -157,7 +173,8 @@ def download_formatted_qtl2_data(history_id): @celery_app.task() -def load_lod_score(analysis_name, history_id): +def load_lod_score(ids): + analysis_id, history_id = ids dataset_pattern = "lod.csv" try: datasets = gi.histories.show_history(history_id, contents=True) @@ -176,19 +193,20 @@ def load_lod_score(analysis_name, history_id): except HTTPError: raise Exception(f"Could not get the dataset : {dataset['name']}") else: + analysis = Analysis.objects.get(pk=analysis_id) file_list = downloaded_ds.decode("utf-8").split("\n") - lod_scores = list(lod_score_gen(file_list[1:], analysis_name, history_id)) - analysis = Analysis.objects.get( - name=analysis_name, galaxy_history_id=history_id - ) + lod_scores = list(lod_score_gen(file_list[1:], analysis.name, history_id)) + lod_entry = LodScore(analysis=analysis, lod_scores=lod_scores) lod_entry.save() # LodScore.objects.bulk_create([lod_scores]) - return {"status": "ok", "lodscores_id": lod_entry.id} + return analysis_id, history_id @celery_app.task() -def load_refine_peaks(analysis_name, history_id): +def load_refine_peaks(ids): + print(ids) + analysis_id, history_id = ids dataset_pattern = "refine-peaks.csv" try: # TODO: test to put a fake history in order to catch exception and @@ -212,17 +230,19 @@ def load_refine_peaks(analysis_name, history_id): except HTTPError: raise Exception(f"Could not get the dataset : {dataset['name']}") else: + analysis = Analysis.objects.get(pk=analysis_id) file_list = downloaded_ds.decode("utf-8").split("\n") - bulk = peaks_gen(file_list[1:], analysis_name, history_id) + bulk = peaks_gen(file_list[1:], analysis.name, history_id) for peak in bulk: peak.save() peak_ids.append(peak.id) - return {"status": "ok", "peak_ids": peak_ids} + return analysis_id, history_id @celery_app.task() -def load_significance_threshold(analysis_name, history_id): +def load_significance_threshold(ids): + analysis_id, history_id = ids # gi = GalaxyInstance(url=url, key=key, verify=False) dataset_pattern = "significance-threshold.csv" ids = [] @@ -246,20 +266,22 @@ def load_significance_threshold(analysis_name, history_id): print("error dans download dataset") raise Exception(f"Could not get the dataset : {dataset['name']}") else: + analysis = Analysis.objects.get(pk=analysis_id) file_list = downloaded_ds.decode("utf-8").split("\n") signi_thres = zip( ["80%", "90%", "95%", "99%", "99.5%", "99.9%"], file_list[1:] ) - bulk = threshold_gen(signi_thres, analysis_name, history_id) + bulk = threshold_gen(signi_thres, analysis) for threshold_db in bulk: threshold_db.save() ids.append(threshold_db.id) # LodScoreSignificanceThreshold.objects.bulk_create(bulk, batch_size=10000) - return {"status": "ok", "ids": ids} + return analysis_id, history_id @celery_app.task() -def load_coefficient(analysis_name, history_id): +def load_coefficient(ids): + analysis_id, history_id = ids dataset_pattern = "coef.csv" try: # TODO: test to put a fake history in order to catch exception and @@ -279,10 +301,11 @@ def load_coefficient(analysis_name, history_id): except HTTPError: raise Exception(f"Could not get the dataset : {dataset['name']}") else: + analysis = Analysis.objects.get(pk=analysis_id) peak_info_it = iter(dataset["name"].split("--")) peak_info = dict(zip(peak_info_it, peak_info_it)) analysis = Analysis.objects.get( - name=analysis_name, galaxy_history_id=history_id + name=analysis.name, galaxy_history_id=history_id ) peak = Peak.objects.get( analysis=analysis, @@ -295,11 +318,12 @@ def load_coefficient(analysis_name, history_id): coef_gen = coefficient_gen(file_list[1:]) coef_db = Coefficient(peak=peak, coefficients=list(coef_gen)) coef_db.save() - return {"status": "ok", "coefficient_id": coef_db.id} + return analysis_id, history_id @celery_app.task() -def load_haplotypes(analysis_name, history_id): +def load_haplotypes(ids): + analysis_id, history_id = ids dataset_pattern = "haplo.csv" try: datasets = gi.histories.show_history(history_id, contents=True) @@ -319,12 +343,10 @@ def load_haplotypes(analysis_name, history_id): except HTTPError: raise Exception(f"Could not get the dataset : {dataset['name']}") else: + analysis = Analysis.objects.get(pk=analysis_id) file_list = downloaded_ds.decode("utf-8").split("\n") peak_info_it = iter(dataset["name"].split("--")) peak_info = dict(zip(peak_info_it, peak_info_it)) - analysis = Analysis.objects.get( - name=analysis_name, galaxy_history_id=history_id - ) peak = Peak.objects.get( analysis=analysis, variable_name=peak_info["var"], @@ -334,11 +356,12 @@ def load_haplotypes(analysis_name, history_id): haplo_gen = haplotype_gen(file_list[1:]) haplotype_db = Haplotypes(peak=peak, haplotypes=list(haplo_gen)) haplotype_db.save() - return {"status": "ok", "id": haplotype_db.id} + return analysis_id, history_id @celery_app.task() -def load_top_snps(analysis_name, history_id): +def load_top_snps(ids): + analysis_id, history_id = ids dataset_pattern = "top_snps.csv" try: datasets = gi.histories.show_history(history_id, contents=True) @@ -357,9 +380,7 @@ def load_top_snps(analysis_name, history_id): except HTTPError: raise Exception(f"Could not get the dataset : {dataset['name']}") else: - analysis = Analysis.objects.get( - name=analysis_name, galaxy_history_id=history_id - ) + analysis = Analysis.objects.get(pk=analysis_id) peak_info_it = iter(dataset["name"].split("--")) peak_info = dict(zip(peak_info_it, peak_info_it)) peak = Peak.objects.get( @@ -372,11 +393,12 @@ def load_top_snps(analysis_name, history_id): top_snps = list(top_snps_gen(file_list[1:])) top_snps_db = TopSnps(peak=peak, top_snps=top_snps) top_snps_db.save() - return {"status": "ok", "id": top_snps_db.id} + return analysis_id, history_id @celery_app.task() -def load_snps_association(analysis_name, history_id): +def load_snps_association(ids): + analysis_id, history_id = ids dataset_pattern = "snps_assoc.csv" try: datasets = gi.histories.show_history(history_id, contents=True) @@ -396,9 +418,7 @@ def load_snps_association(analysis_name, history_id): except HTTPError: raise Exception(f"Could not get the dataset : {dataset['name']}") else: - analysis = Analysis.objects.get( - name=analysis_name, galaxy_history_id=history_id - ) + analysis = Analysis.objects.get(pk=analysis_id) peak_info_it = iter(dataset["name"].split("--")) peak_info = dict(zip(peak_info_it, peak_info_it)) peak = Peak.objects.get( @@ -412,18 +432,16 @@ def load_snps_association(analysis_name, history_id): snps_assoc = list(snps_association_gen(file_list[1:])) snps_assoc_db = SnpsAssociation(peak=peak, snps_association=snps_assoc) snps_assoc_db.save() - return {"status": "ok", "id": snps_assoc_db.id} + return analysis_id, history_id -def threshold_gen(thresholds, analysis_name, history_id): +def threshold_gen(thresholds, analysis): for threshold in thresholds: if len(threshold) == 2: raw_significance, threshold = threshold significance = float(raw_significance.strip('"%')) yield LodScoreSignificanceThreshold( - analysis=Analysis.objects.get( - name=analysis_name, galaxy_history_id=history_id - ), + analysis=analysis, significance=significance, threshold=float(threshold), ) diff --git a/server/api/views.py b/server/api/views.py index 90f5fd6d45240de841561df1e31805a167a9e738..5d8bdc4621a7af148bd9c52160fc056e30c739d4 100644 --- a/server/api/views.py +++ b/server/api/views.py @@ -597,7 +597,7 @@ class PeakViewSet(viewsets.ReadOnlyModelViewSet): if queryset.count() == 0: analysis_db = Analysis.objects.get(id=analysis_pk) load_refine_peaks.apply_async( - (analysis_db.name, analysis_db.galaxy_history_id) + args=[(analysis_pk, analysis_db.galaxy_history_id)] ).get() queryset = Peak.objects.filter( analysis__project=project_pk, analysis=analysis_pk @@ -620,9 +620,9 @@ class PeakViewSet(viewsets.ReadOnlyModelViewSet): if queryset.count() == 0: analysis_db = Analysis.objects.get(id=analysis_pk) - load_coefficient.apply_async( - (analysis_db.name, analysis_db.galaxy_history_id) - ).get() + ids = (analysis_pk, analysis_db.galaxy_history_id) + + load_coefficient.apply_async(args=[ids]).get() queryset = Coefficient.objects.filter(peak=pk) coefficients = CoefficientSerializer(queryset.first()) @@ -634,9 +634,8 @@ class PeakViewSet(viewsets.ReadOnlyModelViewSet): queryset = SnpsAssociation.objects.filter(peak=pk) if queryset.count() == 0: analysis_db = Analysis.objects.get(id=analysis_pk) - load_snps_association.apply_async( - (analysis_db.name, analysis_db.galaxy_history_id) - ).get() + ids = (analysis_pk, analysis_db.galaxy_history_id) + load_snps_association.apply_async(args=[ids]).get() queryset = SnpsAssociation.objects.filter(peak=pk) snps = SnpsAssociationSerializer(queryset.first()) return Response(snps.data) @@ -647,9 +646,8 @@ class PeakViewSet(viewsets.ReadOnlyModelViewSet): if queryset.count() == 0: analysis_db = Analysis.objects.get(id=analysis_pk) - load_top_snps.apply_async( - (analysis_db.name, analysis_db.galaxy_history_id) - ).get() + ids = (analysis_pk, analysis_db.galaxy_history_id) + load_top_snps.apply_async(args=[ids]).get() queryset = TopSnps.objects.filter(peak=pk) snps = TopSnpsSerializer(queryset.first()) @@ -662,9 +660,8 @@ class PeakViewSet(viewsets.ReadOnlyModelViewSet): if queryset.count() == 0: analysis_db = Analysis.objects.get(id=analysis_pk) - load_haplotypes.apply_async( - (analysis_db.name, analysis_db.galaxy_history_id) - ).get() + ids = (analysis_pk, analysis_db.galaxy_history_id) + load_haplotypes.apply_async(args=[ids]).get() queryset = Haplotypes.objects.filter(peak=pk) haplotypes = HaplotypeSerializer(queryset.first()) @@ -752,7 +749,8 @@ class ProjectAnalysisViewSet(viewsets.ModelViewSet): # Missing data, try to get it from galaxy else: analysis_db = Analysis.objects.get(id=pk) - load_lodscores_results(analysis_db.name, analysis_db.galaxy_history_id) + ids = (pk, analysis_db.galaxy_history_id) + load_lodscores_results(ids) lodscores = LodScoreSerializer(LodScore.objects.filter(analysis=pk).first()) return Response(lodscores.data) @@ -773,9 +771,8 @@ class ProjectAnalysisViewSet(viewsets.ModelViewSet): ) if thresholds.count() == 0: analysis_db = Analysis.objects.get(id=pk) - load_significance_threshold.apply_async( - (analysis_db.name, analysis_db.galaxy_history_id) - ).get() + ids = (pk, analysis_db.galaxy_history_id) + load_significance_threshold.apply_async(args=[ids]).get() thresholds = LodScoreSignificanceThreshold.objects.filter( analysis=pk ).order_by("significance") @@ -789,32 +786,64 @@ class ProjectAnalysisViewSet(viewsets.ModelViewSet): def download_qtl2_data(self, request, pk=None, project_pk=None): analysis = Analysis.objects.get(id=pk) # download_formatted_qtl2_data() - result = download_formatted_qtl2_data.apply_async((analysis.galaxy_history_id,)) + result = download_formatted_qtl2_data.apply_async( + args=[analysis.galaxy_history_id] + ) return Response( {"archive_name": result.get(), "project_id": project_pk, "analysis_id": pk} ) @action(detail=True) def workflow_invocation_parameters(self, request, pk=None, project_pk=None): - return Response(self.get_object().get_workflow_invocation()) + if self.get_object().galaxy_workflow_invocation_id: + wf_invocation = self.get_object().get_workflow_invocation() + # print(wf_invocation) + return Response(wf_invocation) + else: + # return Response(status=status.HTTP_204_NO_CONTENT) + return Response(None) @action(detail=True) def workflow_job_params(self, request, pk=None, project_pk=None): return Response(self.get_object().get_workflow_job_params()) -def tools_parameters(request): - runner = GalaxyRunner() - gi = runner.galaxy_instance - wf = gi.workflows.get_workflows(name="cc-qtl-wf")[0] - wf_invocation = gi.workflows.get_invocations(wf["id"])[0] - invocation_id = wf_invocation["id"] - invocation_details = gi.workflows.show_invocation(wf["id"], invocation_id) - invocation_step = gi.workflows.show_invocation_step( - wf["id"], invocation_id, invocation_details["steps"][4]["id"] - ) - job = gi.jobs.show_job(invocation_step["job_id"]) - return JsonResponse(job, safe=False) +# class ToolsParameters(views.APIView): +# authentication_classes = [JWTAuthentication, authentication.SessionAuthentication] +# permission_classes = [IsAuthenticated] + +# def get(self, request, *args, **kwargs): +# wf_id = kwargs["wf_id"] +# runner = GalaxyRunner() +# gi = runner.galaxy_instance +# try: +# wf = gi.workflows.show_workflow(wf_id) +# wf_invocation = gi.workflows.get_invocations(wf["id"])[0] +# invocation_id = wf_invocation["id"] +# invocation_details = gi.workflows.show_invocation(wf["id"], invocation_id) +# invocation_step = gi.workflows.show_invocation_step( +# wf["id"], invocation_id, invocation_details["steps"][4]["id"] +# ) +# job = gi.jobs.show_job(invocation_step["job_id"]) +# return Response(job) +# except ConnectionError as e: +# return Response( +# status=status.HTTP_400_BAD_REQUEST, data=json.loads(e.body)["err_msg"] +# ) + + +# def tools_parameters(request): +# runner = GalaxyRunner() +# gi = runner.galaxy_instance +# wf = gi.workflows.get_workflows(name="cc-qtl-wf")[0] +# wf_invocation = gi.workflows.get_invocations(wf["id"])[0] +# invocation_id = wf_invocation["id"] +# invocation_details = gi.workflows.show_invocation(wf["id"], invocation_id) +# invocation_step = gi.workflows.show_invocation_step( +# wf["id"], invocation_id, invocation_details["steps"][4]["id"] +# ) +# job = gi.jobs.show_job(invocation_step["job_id"]) +# return JsonResponse(job, safe=False) class WorkflowTools(views.APIView): @@ -842,6 +871,39 @@ class WorkflowTools(views.APIView): ) +class WorkflowDefaultParameters(views.APIView): + authentication_classes = [JWTAuthentication, authentication.SessionAuthentication] + permission_classes = [IsAuthenticated] + + def get(self, request, *args, **kwargs): + wf_id = kwargs["wf_id"] + runner = GalaxyRunner() + gi = runner.galaxy_instance + try: + wf = gi.workflows.show_workflow(wf_id) + wf_steps_inputs = dict( + [ + ( + step_id, + dict( + [ + (input_id, input_val) + for input_id, input_val in step["tool_inputs"].items() + if input_val is not None and type(input_val) is not dict + ] + ), + ) + for step_id, step in wf["steps"].items() + if step_id != "0" + ] + ) + return Response(wf_steps_inputs) + except ConnectionError as e: + return Response( + status=status.HTTP_400_BAD_REQUEST, data=json.loads(e.body)["err_msg"] + ) + + def workflow_default_parameters(request): runner = GalaxyRunner() gi = runner.galaxy_instance