Skip to content
Snippets Groups Projects
Select Git revision
  • d0fc0ff523636427fe4db486560201d96f926b9c
  • master default protected
  • dev
  • install
  • new_master
  • protein_ortho
  • documentation
  • pr18
  • dev-licence
  • docker
  • prodigal_train
  • containers
  • module_all
  • functional_tests
  • opti
  • helpers
  • v1.4.1
  • v1.4.0
  • v1.3.1
  • v1.3.0
  • v1.2.0
  • v1.1.0
  • v1.0.1
  • v1.0
24 results

alignment.py

Blame
  • clear-sessions-history.py 2.06 KiB
    from datetime import datetime
    from django.core.management.base import BaseCommand
    from django_to_galaxy.models import GalaxyInstance
    from analysis.models import AnalysisHistory, AnalysisWorkflow, AnalysisGalaxyUser
    from django.conf import settings
    
    
    class Command(BaseCommand):
        help = "Clear the history depending on sessions"
    
        def add_arguments(self, parser):
            parser.add_argument(
                "--galaxyemail",
                action="store",
                required=True,
                help="The email for the galaxy instance",
            )
    
            # Named (optional) arguments
            parser.add_argument(
                "--galaxyinstance",
                action="store",
                required=True,
                help="The name of the galaxy instance",
            )
    
        def handle(self, *args, **options):
            gi = GalaxyInstance.objects.get(name=options["galaxyinstance"])
            gu = AnalysisGalaxyUser.objects.filter(
                email=options["galaxyemail"],
                galaxy_instance=gi,
            ).first()
    
            histories = AnalysisHistory.objects.filter(
                analysis_owner=gu, session__expire_date__lt=datetime.now().astimezone()
            )
            for h in histories:
                print(f"Delete expire history {h.name} expired : {h.session.expire_date}")
    
                try:
                    h.analysis_owner.obj_gi.histories.delete(id_=h.galaxy_id, purge=True)
                except ConnectionError as e:
                    print(e)
                    raise e
                else:
                    h.delete()
    
            # clean orphan galaxy histories
            histories = gu.obj_gi.gi.histories.get_histories()
            histories_from_host = [
                h for h in histories if settings.DF_HOSTNAME in h["tags"]
            ]
            for h in histories_from_host:
                galaxy_id = h["id"]
                # check if in database.
                # if not, purge it
                history = AnalysisHistory.objects.filter(galaxy_id=galaxy_id)
                if not history.exists():
                    print(f"Delete orphan history {h['name']}.")
                    gu.obj_gi.gi.histories.delete_history(galaxy_id, purge=True)