diff --git a/src/pypelines/__init__.py b/src/pypelines/__init__.py index ba31a3edbab453d00306e54004b4acbc9c2da172..edf9cfb16f3e07ba05c0b7543b47eae3b1beedef 100644 --- a/src/pypelines/__init__.py +++ b/src/pypelines/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.0.23" +__version__ = "0.0.24" from . import loggs from .pipes import * diff --git a/src/pypelines/pipelines.py b/src/pypelines/pipelines.py index 39d48af5235710abc5b76fb691d4197e5ee08a8e..842e88598f011e782e4a1eb798dc5d9d5a6c916e 100644 --- a/src/pypelines/pipelines.py +++ b/src/pypelines/pipelines.py @@ -1,5 +1,5 @@ from typing import Callable, Type, Dict, Iterable, Protocol, TYPE_CHECKING - +from logging import getLogger import os if TYPE_CHECKING: @@ -142,8 +142,16 @@ class Pipeline: def configure_celery(self) -> None: from .tasks import CeleryHandler - self.celery = CeleryHandler(self.conf_path, self.pipeline_name) - self.use_celery = True + celery = CeleryHandler(self.conf_path, self.pipeline_name) + if celery.success: + self.self.celery = celery + self.use_celery = True + else: + getLogger().warning( + f"Could not initialize celery for the pipeline {self.pipeline_name}." + "Don't worry, about this alert, " + "this is not be an issue if you didn't explicitely planned on using celery." + ) def finalize(self): if self.use_celery: diff --git a/src/pypelines/tasks.py b/src/pypelines/tasks.py index 8408dd12e592d537e85d2df67e70e154c7b065c5..6e651e4e4abfe68e00eded180f66c456a3bd5fe9 100644 --- a/src/pypelines/tasks.py +++ b/src/pypelines/tasks.py @@ -12,24 +12,53 @@ class CeleryHandler: settings = None app = None app_name = None + success: bool = False def __init__(self, conf_path, pipeline_name): + logger = getLogger() settings_files = self.get_setting_files_path(conf_path, pipeline_name) - self.settings = Dynaconf(settings_files=settings_files) - self.app_name = self.settings.get("app_name", pipeline_name) - self.app = Celery( - self.app_name, - broker=( - f"{self.settings.connexion.broker_type}://" - f"{self.settings.account}:{self.settings.password}@{self.settings.address}//" - ), - backend=f"{self.settings.connexion.backend}://", - ) - - for key, value in self.settings.conf.items(): + + if any([not os.path.isfile(file) for file in settings_files]): + logger.warning(f"Some celery configuration files were missing for pipeline {pipeline_name}") + return + + try: + self.settings = Dynaconf(settings_files=settings_files) + + self.app_name = self.settings.get("app_name", pipeline_name) + broker_type = self.settings.connexion.broker_type + account = self.settings.account + password = self.settings.password + address = self.settings.address + backend = self.settings.connexion.backend + conf_data = self.settings.conf + + except Exception as e: + logger.warning( + "Could not get all necessary information to configure celery when reading config files." + "Check their content." + ) + return + + try: + self.app = Celery( + self.app_name, + broker=(f"{broker_type}://" f"{account}:{password}@{address}//"), + backend=f"{backend}://", + ) + except Exception as e: + logger.warning("Instanciating celery app failed. Maybe rabbitmq is not running ?") + + for key, value in conf_data.items(): setattr(self.app.conf, key, value) - self.connector = ONE(data_access_mode="remote") + try: + self.connector = ONE(data_access_mode="remote") + except Exception as e: + logger.warning("Instanciating One connector during celery configuration failed.") + return + + self.success = True def get_setting_files_path(self, conf_path, pipeline_name): files = []