From 80ef5194436312b6e5562d470a2dc3e86b51a7cc Mon Sep 17 00:00:00 2001 From: JostTim <44769559+JostTim@users.noreply.github.com> Date: Wed, 27 Mar 2024 15:28:45 +0100 Subject: [PATCH] wip on laptop --- src/pypelines/celery_tasks.py | 176 +++++++++++++++++++++------------- src/pypelines/loggs.py | 81 +++++----------- 2 files changed, 134 insertions(+), 123 deletions(-) diff --git a/src/pypelines/celery_tasks.py b/src/pypelines/celery_tasks.py index ad702f3..6e034cf 100644 --- a/src/pypelines/celery_tasks.py +++ b/src/pypelines/celery_tasks.py @@ -27,83 +27,48 @@ class CeleryAlyxTaskManager(BaseStepTaskManager): from one import ONE connector = ONE(mode="remote", data_access_mode="remote") - task = TaskRecord(task_id) + task = CeleryTaskRecord(task_id) try: - session = connector.search(id=task.session, details=True, no_cache=True) + session = connector.search( + id=task["session"], details=True, no_cache=True) with LogTask(task) as log_object: logger = log_object.logger - task.log = log_object.filename - task.status = "Started" - task = TaskRecord(connector.alyx.rest("tasks", "partial_update", **task.export())) + task["log"] = log_object.filename + task["status"] = "Started" + task.partial_update() try: - self.step.generate(session, extra=extra, skip=True, check_requirements=True, **task.arguments) - task.status = self.status_from_logs(log_object) + self.step.generate( + session, extra=extra, skip=True, check_requirements=True, **task.arguments) + task.status_from_logs(log_object) except Exception as e: traceback_msg = format_traceback_exc() logger.critical(f"Fatal Error : {e}") logger.critical("Traceback :\n" + traceback_msg) - task.status = "Failed" + task["status"] = "Failed" except Exception as e: # if it fails outside of the nested try statement, we can't store logs files, # and we mention the failure through alyx directly. - task.status = "Uncatched_Fail" - task.log = str(e) + task["status"] = "Uncatched_Fail" + task["log"] = str(e) - connector.alyx.rest("tasks", "partial_update", **task.export()) + task.partial_update() def start(self, session, extra=None, **kwargs): if not self.backend: raise NotImplementedError( - "Cannot use this feature with a pipeline that doesn't have an implemented and working runner backend" + "Cannot start a task on a celery cluster as this pipeline " + "doesn't have a working celery backend" ) - from one import ONE - - connector = ONE(mode="remote", data_access_mode="remote") + return CeleryTaskRecord.create(self, session, extra, **kwargs) - worker = self.backend.app.tasks[self.step.complete_name] - - TaskRecord() - - task_dict = connector.alyx.rest( - "tasks", - "create", - data={ - "session": session.name, - "name": self.step.complete_name, - "arguments": kwargs, - "status": "Waiting", - "executable": self.backend.app_name, - }, - ) - response_handle = worker.delay(task_dict["id"], extra=extra) - # launch the task on the server, and waits until available. - return RemoteTask(task_dict, response_handle) - - @staticmethod - def status_from_logs(log_object): - with open(log_object.fullpath, "r") as f: - content = f.read() - - if len(content) == 0: - return "No_Info" - if "CRITICAL" in content: - return "Failed" - elif "ERROR" in content: - return "Errors" - elif "WARNING" in content: - return "Warnings" - else: - return "Complete" - - -class TaskRecord(dict): +class CeleryTaskRecord(dict): # a class to make dictionnary keys accessible with attribute syntax def __init__(self, task_id, task_infos_dict={}, response_handle=None): if task_infos_dict: @@ -115,35 +80,62 @@ class TaskRecord(dict): task_infos_dict = connector.alyx.rest("tasks", "read", id=task_id) super().__init__(task_infos_dict) + self.session = self.response = response_handle + def status_from_logs(self, log_object): + with open(log_object.fullpath, "r") as f: + content = f.read() + + if len(content) == 0: + status = "No_Info" + elif "CRITICAL" in content: + status = "Failed" + elif "ERROR" in content: + status = "Errors" + elif "WARNING" in content: + status = "Warnings" + else: + status = "Complete" + + self["status"] = status + + def partial_update(self): + from one import ONE + connector = ONE(mode="remote", data_access_mode="remote") + connector.alyx.rest("tasks", "partial_update", **self.export()) + @property def arguments(self): args = self.get("arguments", {}) return args if args else {} + @property + def session_path(self): + from one import ONE + connector = ONE(mode="remote", data_access_mode="remote") + connector.alyx.rest(sess) + def export(self): return {"id": self["id"], "data": {k: v for k, v in self.items() if k not in ["id", "session_path"]}} @staticmethod - def create(step: "BaseStep", session, backend, extra=None, **kwargs): + def create(task_manager: CeleryAlyxTaskManager, session, extra=None, **kwargs): from one import ONE connector = ONE(mode="remote", data_access_mode="remote") data = { "session": session.name, - "name": step.complete_name, + "name": task_manager.step.complete_name, "arguments": kwargs, "status": "Waiting", - "executable": backend.app_name, + "executable": task_manager.backend.app_name, } - connector.alyx.rest("tasks", "create", data=data) task_dict = connector.alyx.rest("tasks", "create", data=data) - worker = backend.app.tasks[step.complete_name] - + worker = task_manager.backend.app.tasks[task_manager.step.complete_name] response_handle = worker.delay(task_dict["id"], extra=extra) return TaskRecord(task_dict["id"], task_dict, response_handle) @@ -186,6 +178,56 @@ def get_setting_files_path(conf_path, app_name) -> List[Path]: return files +class LogTask: + def __init__(self, task_record, username="", level="LOAD"): + self.path = os.path.normpath(task_record.session_path) + self.username = username + os.makedirs(self.path, exist_ok=True) + self.worker_pk = task_record.id + self.task_name = task_record.name + self.level = getattr(logging, level.upper()) + + def __enter__(self): + self.logger = logging.getLogger() + self.set_handler() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.remove_handler() + + def set_handler(self): + self.filename = os.path.join( + "logs", f"task_log.{self.task_name}.{self.worker_pk}.log") + self.fullpath = os.path.join(self.path, self.filename) + fh = logging.FileHandler(self.fullpath) + f_formater = FileFormatter() + coloredlogs.HostNameFilter.install( + fmt=f_formater.FORMAT, + handler=fh, + style=f_formater.STYLE, + use_chroot=True, + ) + coloredlogs.ProgramNameFilter.install( + fmt=f_formater.FORMAT, + handler=fh, + programname=self.task_name, + style=f_formater.STYLE, + ) + coloredlogs.UserNameFilter.install( + fmt=f_formater.FORMAT, + handler=fh, + username=self.username, + style=f_formater.STYLE, + ) + + fh.setLevel(self.level) + fh.setFormatter() + self.logger.addHandler(fh) + + def remove_handler(self): + self.logger.removeHandler(self.logger.handlers[-1]) + + def create_celery_app(conf_path, app_name="pypelines"): failure_message = ( @@ -198,19 +240,22 @@ def create_celery_app(conf_path, app_name="pypelines"): settings_files = get_setting_files_path(conf_path, app_name) if len(settings_files) == 0: - logger.warning(f"{failure_message} Could not find celery toml config files.") + logger.warning( + f"{failure_message} Could not find celery toml config files.") return None try: from dynaconf import Dynaconf except ImportError: - logger.warning(f"{failure_message} Could not import dynaconf. Maybe it is not istalled ?") + logger.warning( + f"{failure_message} Could not import dynaconf. Maybe it is not istalled ?") return None try: settings = Dynaconf(settings_files=settings_files) except Exception as e: - logger.warning(f"{failure_message} Could not create dynaconf object. {e}") + logger.warning( + f"{failure_message} Could not create dynaconf object. {e}") return None try: @@ -228,7 +273,8 @@ def create_celery_app(conf_path, app_name="pypelines"): try: from celery import Celery except ImportError: - logger.warning(f"{failure_message} Could not import celery. Maybe is is not installed ?") + logger.warning( + f"{failure_message} Could not import celery. Maybe is is not installed ?") return None try: @@ -238,14 +284,16 @@ def create_celery_app(conf_path, app_name="pypelines"): backend=f"{backend}://", ) except Exception as e: - logger.warning(f"{failure_message} Could not create app. Maybe rabbitmq server @{address} is not running ? {e}") + logger.warning( + f"{failure_message} Could not create app. Maybe rabbitmq server @{address} is not running ? {e}") return None for key, value in conf_data.items(): try: setattr(app.conf, key, value) except Exception as e: - logger.warning(f"{failure_message} Could assign extra attribute {key} to celery app. {e}") + logger.warning( + f"{failure_message} Could assign extra attribute {key} to celery app. {e}") return None return app diff --git a/src/pypelines/loggs.py b/src/pypelines/loggs.py index 0127455..bfe3fad 100644 --- a/src/pypelines/loggs.py +++ b/src/pypelines/loggs.py @@ -1,4 +1,7 @@ -import logging, sys, re, os +import logging +import sys +import re +import os from functools import wraps import coloredlogs @@ -126,7 +129,8 @@ class DynamicColoredFormatter(coloredlogs.ColoredFormatter): """ pattern = r"%\((?P<part_name>\w+)\)-?(?P<length>\d+)?[sd]?" result = re.findall(pattern, fmt) - padding_dict = {name: int(padding) if padding else 0 for name, padding in result} + padding_dict = { + name: int(padding) if padding else 0 for name, padding in result} return padding_dict @@ -155,8 +159,10 @@ class DynamicColoredFormatter(coloredlogs.ColoredFormatter): missing_length = 0 if missing_length < 0 else missing_length if part_name in self.dynamic_levels.keys(): dyn_keys = self.dynamic_levels[part_name] - dynamic_style = {k: v for k, v in style.items() if k in dyn_keys or dyn_keys == "all"} - part = coloredlogs.ansi_wrap(coloredlogs.coerce_string(part), **dynamic_style) + dynamic_style = {k: v for k, v in style.items( + ) if k in dyn_keys or dyn_keys == "all"} + part = coloredlogs.ansi_wrap( + coloredlogs.coerce_string(part), **dynamic_style) part = part + (" " * missing_length) setattr(copy, part_name, part) record = copy @@ -234,55 +240,6 @@ class FileFormatter(SugarColoredFormatter): FORMAT = f"[%(asctime)s] %(hostname)s %(levelname)-{LEVELLENGTH}s : %(name)-{NAMELENGTH}s : %(message)s" -class LogTask: - def __init__(self, task_record, username="", level="LOAD"): - self.path = os.path.normpath(task_record.session_path) - self.username = username - os.makedirs(self.path, exist_ok=True) - self.worker_pk = task_record.id - self.task_name = task_record.name - self.level = getattr(logging, level.upper()) - - def __enter__(self): - self.logger = logging.getLogger() - self.set_handler() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.remove_handler() - - def set_handler(self): - self.filename = os.path.join("logs", f"task_log.{self.task_name}.{self.worker_pk}.log") - self.fullpath = os.path.join(self.path, self.filename) - fh = logging.FileHandler(self.fullpath) - f_formater = FileFormatter() - coloredlogs.HostNameFilter.install( - fmt=f_formater.FORMAT, - handler=fh, - style=f_formater.STYLE, - use_chroot=True, - ) - coloredlogs.ProgramNameFilter.install( - fmt=f_formater.FORMAT, - handler=fh, - programname=self.task_name, - style=f_formater.STYLE, - ) - coloredlogs.UserNameFilter.install( - fmt=f_formater.FORMAT, - handler=fh, - username=self.username, - style=f_formater.STYLE, - ) - - fh.setLevel(self.level) - fh.setFormatter() - self.logger.addHandler(fh) - - def remove_handler(self): - self.logger.removeHandler(self.logger.handlers[-1]) - - class ContextFilter(logging.Filter): """This is a filter which injects contextual information into the log.""" @@ -347,7 +304,8 @@ class LogContext: for handler in self.root_logger.handlers: for filter in handler.filters: if getattr(filter, "context_msg", "") == self.context_msg: - self.root_logger.debug(f"Filter already added to handler {handler}") + self.root_logger.debug( + f"Filter already added to handler {handler}") found = True break @@ -359,7 +317,8 @@ class LogContext: context_filter = ContextFilter(self.context_msg) handler.addFilter(context_filter) self.context_filters[handler] = context_filter - self.root_logger.debug(f"Added filter {context_filter} to handler {handler}") + self.root_logger.debug( + f"Added filter {context_filter} to handler {handler}") break def __exit__(self, exc_type, exc_val, exc_tb): @@ -375,7 +334,8 @@ class LogContext: if filer_to_remove is None: continue else: - self.root_logger.debug(f"Removing filter {filer_to_remove} from handler {handler} in this context") + self.root_logger.debug( + f"Removing filter {filer_to_remove} from handler {handler} in this context") handler.removeFilter(filer_to_remove) @@ -452,11 +412,14 @@ def addLoggingLevel(levelName, levelNum, methodName=None, if_exists="raise"): if if_exists == "keep": return if hasattr(logging, levelName): - raise AttributeError("{} already defined in logging module".format(levelName)) + raise AttributeError( + "{} already defined in logging module".format(levelName)) if hasattr(logging, methodName): - raise AttributeError("{} already defined in logging module".format(methodName)) + raise AttributeError( + "{} already defined in logging module".format(methodName)) if hasattr(logging.getLoggerClass(), methodName): - raise AttributeError("{} already defined in logger class".format(methodName)) + raise AttributeError( + "{} already defined in logger class".format(methodName)) # This method was inspired by the answers to Stack Overflow post # http://stackoverflow.com/q/2183233/2988730, especially -- GitLab