From 519abaa61dd5f0c3248af70e0d900ad3a6bacb36 Mon Sep 17 00:00:00 2001
From: Timothe Jost <timothe.jost@wanadoo.fr>
Date: Tue, 5 Mar 2024 16:17:09 +0100
Subject: [PATCH] fixing celery erroring on install if conf files are not
 present - now throws a warning

---
 src/pypelines/__init__.py  |  2 +-
 src/pypelines/pipelines.py | 14 +++++++---
 src/pypelines/tasks.py     | 55 +++++++++++++++++++++++++++++---------
 3 files changed, 54 insertions(+), 17 deletions(-)

diff --git a/src/pypelines/__init__.py b/src/pypelines/__init__.py
index ba31a3e..edf9cfb 100644
--- a/src/pypelines/__init__.py
+++ b/src/pypelines/__init__.py
@@ -1,4 +1,4 @@
-__version__ = "0.0.23"
+__version__ = "0.0.24"
 
 from . import loggs
 from .pipes import *
diff --git a/src/pypelines/pipelines.py b/src/pypelines/pipelines.py
index 39d48af..842e885 100644
--- a/src/pypelines/pipelines.py
+++ b/src/pypelines/pipelines.py
@@ -1,5 +1,5 @@
 from typing import Callable, Type, Dict, Iterable, Protocol, TYPE_CHECKING
-
+from logging import getLogger
 import os
 
 if TYPE_CHECKING:
@@ -142,8 +142,16 @@ class Pipeline:
     def configure_celery(self) -> None:
         from .tasks import CeleryHandler
 
-        self.celery = CeleryHandler(self.conf_path, self.pipeline_name)
-        self.use_celery = True
+        celery = CeleryHandler(self.conf_path, self.pipeline_name)
+        if celery.success:
+            self.self.celery = celery
+            self.use_celery = True
+        else:
+            getLogger().warning(
+                f"Could not initialize celery for the pipeline {self.pipeline_name}."
+                "Don't worry, about this alert, "
+                "this is not be an issue if you didn't explicitely planned on using celery."
+            )
 
     def finalize(self):
         if self.use_celery:
diff --git a/src/pypelines/tasks.py b/src/pypelines/tasks.py
index 8408dd1..6e651e4 100644
--- a/src/pypelines/tasks.py
+++ b/src/pypelines/tasks.py
@@ -12,24 +12,53 @@ class CeleryHandler:
     settings = None
     app = None
     app_name = None
+    success: bool = False
 
     def __init__(self, conf_path, pipeline_name):
+        logger = getLogger()
         settings_files = self.get_setting_files_path(conf_path, pipeline_name)
-        self.settings = Dynaconf(settings_files=settings_files)
-        self.app_name = self.settings.get("app_name", pipeline_name)
-        self.app = Celery(
-            self.app_name,
-            broker=(
-                f"{self.settings.connexion.broker_type}://"
-                f"{self.settings.account}:{self.settings.password}@{self.settings.address}//"
-            ),
-            backend=f"{self.settings.connexion.backend}://",
-        )
-
-        for key, value in self.settings.conf.items():
+
+        if any([not os.path.isfile(file) for file in settings_files]):
+            logger.warning(f"Some celery configuration files were missing for pipeline {pipeline_name}")
+            return
+
+        try:
+            self.settings = Dynaconf(settings_files=settings_files)
+
+            self.app_name = self.settings.get("app_name", pipeline_name)
+            broker_type = self.settings.connexion.broker_type
+            account = self.settings.account
+            password = self.settings.password
+            address = self.settings.address
+            backend = self.settings.connexion.backend
+            conf_data = self.settings.conf
+
+        except Exception as e:
+            logger.warning(
+                "Could not get all necessary information to configure celery when reading config files."
+                "Check their content."
+            )
+            return
+
+        try:
+            self.app = Celery(
+                self.app_name,
+                broker=(f"{broker_type}://" f"{account}:{password}@{address}//"),
+                backend=f"{backend}://",
+            )
+        except Exception as e:
+            logger.warning("Instanciating celery app failed. Maybe rabbitmq is not running ?")
+
+        for key, value in conf_data.items():
             setattr(self.app.conf, key, value)
 
-        self.connector = ONE(data_access_mode="remote")
+        try:
+            self.connector = ONE(data_access_mode="remote")
+        except Exception as e:
+            logger.warning("Instanciating One connector during celery configuration failed.")
+            return
+
+        self.success = True
 
     def get_setting_files_path(self, conf_path, pipeline_name):
         files = []
-- 
GitLab