Skip to content
Snippets Groups Projects
Commit 49dd60ae authored by Timothe Jost's avatar Timothe Jost
Browse files

adding a fast worker creation method to the celery app

parent 59d460b0
No related branches found
Tags v11.1.0
No related merge requests found
Pipeline #152669 passed
__version__ = "0.0.78" __version__ = "0.0.79"
from . import loggs from . import loggs
from .pipes import * from .pipes import *
......
...@@ -8,6 +8,8 @@ import coloredlogs ...@@ -8,6 +8,8 @@ import coloredlogs
from logging import getLogger from logging import getLogger
from platform import node from platform import node
from pandas import Series from pandas import Series
import platform
from threading import Thread
from typing import TYPE_CHECKING, List from typing import TYPE_CHECKING, List
...@@ -832,6 +834,10 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery | ...@@ -832,6 +834,10 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery |
logger.error(f"No handshake result. All workers are busy ? {e}") logger.error(f"No handshake result. All workers are busy ? {e}")
return False return False
def single_worker_start(self: Celery):
thread = CeleryWorkerThread(self)
thread.start()
settings_files = get_setting_files_path(conf_path) settings_files = get_setting_files_path(conf_path)
if len(settings_files) == 0: if len(settings_files) == 0:
...@@ -893,9 +899,27 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery | ...@@ -893,9 +899,27 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery |
app.get_celery_app_tasks = MethodType(get_celery_app_tasks, app) # type: ignore app.get_celery_app_tasks = MethodType(get_celery_app_tasks, app) # type: ignore
app.launch_named_task_remotely = MethodType(launch_named_task_remotely, app) # type: ignore app.launch_named_task_remotely = MethodType(launch_named_task_remotely, app) # type: ignore
app.is_hand_shaken = MethodType(is_hand_shaken, app) # type: ignore app.is_hand_shaken = MethodType(is_hand_shaken, app) # type: ignore
app.single_worker_start = MethodType(single_worker_start, app)
logger.info(f"The celery app {app_name} was created successfully.") logger.info(f"The celery app {app_name} was created successfully.")
APPLICATIONS_STORE[app_name] = app APPLICATIONS_STORE[app_name] = app
return app return app
class CeleryWorkerThread(Thread):
def __init__(self, app: Celery):
super().__init__()
self.app = app
def run(self):
self.app.worker_main(argv=["worker", "--loglevel=INFO", "--concurrency=1", "--pool=solo"])
# self.app.start()
def stop(self):
worker_stats = self.app.control.inspect().stats()
worker_names = worker_stats.keys() if worker_stats else []
current_node_name = f"celery@{platform.node()}"
if current_node_name in worker_names:
self.app.control.broadcast("shutdown", destination=[current_node_name])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment