diff --git a/src/pypelines/__init__.py b/src/pypelines/__init__.py index c8f6ef2e673380f4795d4e6c93a54838eccd33b4..e56db64febfd638575f764e8a69303b2c6143b45 100644 --- a/src/pypelines/__init__.py +++ b/src/pypelines/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.0.78" +__version__ = "0.0.79" from . import loggs from .pipes import * diff --git a/src/pypelines/celery_tasks.py b/src/pypelines/celery_tasks.py index f4d771a030ca217a5a488ab9b768de8b478ded8f..7f79a0ef23eccb4d4b26911770f9a1bc82deafa5 100644 --- a/src/pypelines/celery_tasks.py +++ b/src/pypelines/celery_tasks.py @@ -8,6 +8,8 @@ import coloredlogs from logging import getLogger from platform import node from pandas import Series +import platform +from threading import Thread from typing import TYPE_CHECKING, List @@ -832,6 +834,10 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery | logger.error(f"No handshake result. All workers are busy ? {e}") return False + def single_worker_start(self: Celery): + thread = CeleryWorkerThread(self) + thread.start() + settings_files = get_setting_files_path(conf_path) if len(settings_files) == 0: @@ -893,9 +899,27 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery | app.get_celery_app_tasks = MethodType(get_celery_app_tasks, app) # type: ignore app.launch_named_task_remotely = MethodType(launch_named_task_remotely, app) # type: ignore app.is_hand_shaken = MethodType(is_hand_shaken, app) # type: ignore + app.single_worker_start = MethodType(single_worker_start, app) logger.info(f"The celery app {app_name} was created successfully.") APPLICATIONS_STORE[app_name] = app return app + + +class CeleryWorkerThread(Thread): + def __init__(self, app: Celery): + super().__init__() + self.app = app + + def run(self): + self.app.worker_main(argv=["worker", "--loglevel=INFO", "--concurrency=1", "--pool=solo"]) + # self.app.start() + + def stop(self): + worker_stats = self.app.control.inspect().stats() + worker_names = worker_stats.keys() if worker_stats else [] + current_node_name = f"celery@{platform.node()}" + if current_node_name in worker_names: + self.app.control.broadcast("shutdown", destination=[current_node_name])