From 49dd60ae15d19e04d5113a376d21b6a0c1349319 Mon Sep 17 00:00:00 2001 From: Timothe Jost <timothe.jost@wanadoo.fr> Date: Thu, 20 Mar 2025 18:42:38 +0100 Subject: [PATCH] adding a fast worker creation method to the celery app --- src/pypelines/__init__.py | 2 +- src/pypelines/celery_tasks.py | 24 ++++++++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/src/pypelines/__init__.py b/src/pypelines/__init__.py index c8f6ef2..e56db64 100644 --- a/src/pypelines/__init__.py +++ b/src/pypelines/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.0.78" +__version__ = "0.0.79" from . import loggs from .pipes import * diff --git a/src/pypelines/celery_tasks.py b/src/pypelines/celery_tasks.py index f4d771a..7f79a0e 100644 --- a/src/pypelines/celery_tasks.py +++ b/src/pypelines/celery_tasks.py @@ -8,6 +8,8 @@ import coloredlogs from logging import getLogger from platform import node from pandas import Series +import platform +from threading import Thread from typing import TYPE_CHECKING, List @@ -832,6 +834,10 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery | logger.error(f"No handshake result. All workers are busy ? {e}") return False + def single_worker_start(self: Celery): + thread = CeleryWorkerThread(self) + thread.start() + settings_files = get_setting_files_path(conf_path) if len(settings_files) == 0: @@ -893,9 +899,27 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery | app.get_celery_app_tasks = MethodType(get_celery_app_tasks, app) # type: ignore app.launch_named_task_remotely = MethodType(launch_named_task_remotely, app) # type: ignore app.is_hand_shaken = MethodType(is_hand_shaken, app) # type: ignore + app.single_worker_start = MethodType(single_worker_start, app) logger.info(f"The celery app {app_name} was created successfully.") APPLICATIONS_STORE[app_name] = app return app + + +class CeleryWorkerThread(Thread): + def __init__(self, app: Celery): + super().__init__() + self.app = app + + def run(self): + self.app.worker_main(argv=["worker", "--loglevel=INFO", "--concurrency=1", "--pool=solo"]) + # self.app.start() + + def stop(self): + worker_stats = self.app.control.inspect().stats() + worker_names = worker_stats.keys() if worker_stats else [] + current_node_name = f"celery@{platform.node()}" + if current_node_name in worker_names: + self.app.control.broadcast("shutdown", destination=[current_node_name]) -- GitLab