From 51a5a639444add4522e1af44196d8f5d25698526 Mon Sep 17 00:00:00 2001 From: Brett Date: Thu, 8 Aug 2024 23:01:26 -0500 Subject: [PATCH] Use pubsub messages instead of a background thread to process changes (#92) * Use pubsub messages instead of a background thread to process changes to the RenderQueue * Misc logging improvements --- src/engines/core/base_worker.py | 2 +- src/render_queue.py | 29 +++++++++++++++-------------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/src/engines/core/base_worker.py b/src/engines/core/base_worker.py index e64f349..c1fbe75 100644 --- a/src/engines/core/base_worker.py +++ b/src/engines/core/base_worker.py @@ -103,7 +103,7 @@ class BaseRenderWorker(Base): self.watchdog_timeout = 120 def __repr__(self): - return f"<{self.__class__.__name__}|{self.id}|{self.name}|{self.status}|{self.input_path}>" + return f"" @property def total_frames(self): diff --git a/src/render_queue.py b/src/render_queue.py index c110692..e7cb504 100755 --- a/src/render_queue.py +++ b/src/render_queue.py @@ -1,9 +1,8 @@ import logging import os -import threading -import time from datetime import datetime +from pubsub import pub from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.orm.exc import DetachedInstanceError @@ -39,21 +38,20 @@ class RenderQueue: @classmethod def start(cls): + logger.debug("Starting render queue updates") cls.is_running = True - cls.__eval_thread = threading.Thread(target=cls.__eval_loop, daemon=True) - cls.__eval_thread.start() + cls.evaluate_queue() @classmethod - def __eval_loop(cls): - while cls.is_running: - try: - RenderQueue.evaluate_queue() - except Exception as e: - logger.exception(f"Uncaught error while evaluating queue: {e}") - time.sleep(cls.evaluation_inverval) + def __local_job_status_changed(cls, job_id, old_status, new_status): + render_job = RenderQueue.job_with_id(job_id, none_ok=True) + if render_job and cls.is_running: # ignore changes from render jobs not in the queue yet + logger.debug(f"RenderQueue detected job {job_id} has changed from {old_status} -> {new_status}") + RenderQueue.evaluate_queue() @classmethod def stop(cls): + logger.debug("Stopping render queue updates") cls.is_running = False # -------------------------------------------- @@ -62,12 +60,14 @@ class RenderQueue: @classmethod def add_to_render_queue(cls, render_job, force_start=False): - logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job)) + logger.info(f"Adding job to render queue: {render_job}") cls.job_queue.append(render_job) - if force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED): + if cls.is_running and force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED): cls.start_job(render_job) cls.session.add(render_job) cls.save_state() + if cls.is_running: + cls.evaluate_queue() @classmethod def all_jobs(cls): @@ -113,6 +113,7 @@ class RenderQueue: cls.session = sessionmaker(bind=cls.engine)() from src.engines.core.base_worker import BaseRenderWorker cls.job_queue = cls.session.query(BaseRenderWorker).all() + pub.subscribe(cls.__local_job_status_changed, 'status_change') @classmethod def save_state(cls): @@ -157,7 +158,7 @@ class RenderQueue: @classmethod def start_job(cls, job): - logger.info(f'Starting job: {job} - Priority {job.priority}') + logger.info(f'Starting job: {job}') job.start() cls.save_state()