Skip to content
Snippets Groups Projects
Commit 519abaa6 authored by Timothe Jost's avatar Timothe Jost
Browse files

fixing celery erroring on install if conf files are not present - now throws a warning

parent 488602cb
No related branches found
No related tags found
No related merge requests found
Pipeline #125375 passed
__version__ = "0.0.23" __version__ = "0.0.24"
from . import loggs from . import loggs
from .pipes import * from .pipes import *
......
from typing import Callable, Type, Dict, Iterable, Protocol, TYPE_CHECKING from typing import Callable, Type, Dict, Iterable, Protocol, TYPE_CHECKING
from logging import getLogger
import os import os
if TYPE_CHECKING: if TYPE_CHECKING:
...@@ -142,8 +142,16 @@ class Pipeline: ...@@ -142,8 +142,16 @@ class Pipeline:
def configure_celery(self) -> None: def configure_celery(self) -> None:
from .tasks import CeleryHandler from .tasks import CeleryHandler
self.celery = CeleryHandler(self.conf_path, self.pipeline_name) celery = CeleryHandler(self.conf_path, self.pipeline_name)
self.use_celery = True if celery.success:
self.self.celery = celery
self.use_celery = True
else:
getLogger().warning(
f"Could not initialize celery for the pipeline {self.pipeline_name}."
"Don't worry, about this alert, "
"this is not be an issue if you didn't explicitely planned on using celery."
)
def finalize(self): def finalize(self):
if self.use_celery: if self.use_celery:
......
...@@ -12,24 +12,53 @@ class CeleryHandler: ...@@ -12,24 +12,53 @@ class CeleryHandler:
settings = None settings = None
app = None app = None
app_name = None app_name = None
success: bool = False
def __init__(self, conf_path, pipeline_name): def __init__(self, conf_path, pipeline_name):
logger = getLogger()
settings_files = self.get_setting_files_path(conf_path, pipeline_name) settings_files = self.get_setting_files_path(conf_path, pipeline_name)
self.settings = Dynaconf(settings_files=settings_files)
self.app_name = self.settings.get("app_name", pipeline_name) if any([not os.path.isfile(file) for file in settings_files]):
self.app = Celery( logger.warning(f"Some celery configuration files were missing for pipeline {pipeline_name}")
self.app_name, return
broker=(
f"{self.settings.connexion.broker_type}://" try:
f"{self.settings.account}:{self.settings.password}@{self.settings.address}//" self.settings = Dynaconf(settings_files=settings_files)
),
backend=f"{self.settings.connexion.backend}://", self.app_name = self.settings.get("app_name", pipeline_name)
) broker_type = self.settings.connexion.broker_type
account = self.settings.account
for key, value in self.settings.conf.items(): password = self.settings.password
address = self.settings.address
backend = self.settings.connexion.backend
conf_data = self.settings.conf
except Exception as e:
logger.warning(
"Could not get all necessary information to configure celery when reading config files."
"Check their content."
)
return
try:
self.app = Celery(
self.app_name,
broker=(f"{broker_type}://" f"{account}:{password}@{address}//"),
backend=f"{backend}://",
)
except Exception as e:
logger.warning("Instanciating celery app failed. Maybe rabbitmq is not running ?")
for key, value in conf_data.items():
setattr(self.app.conf, key, value) setattr(self.app.conf, key, value)
self.connector = ONE(data_access_mode="remote") try:
self.connector = ONE(data_access_mode="remote")
except Exception as e:
logger.warning("Instanciating One connector during celery configuration failed.")
return
self.success = True
def get_setting_files_path(self, conf_path, pipeline_name): def get_setting_files_path(self, conf_path, pipeline_name):
files = [] files = []
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment