Use pubsub messages instead of a background thread to process changes (#92)

* Use pubsub messages instead of a background thread to process changes to the RenderQueue

* Misc logging improvements
This commit is contained in:
2024-08-08 23:01:26 -05:00
committed by GitHub
parent 3600eeb21b
commit 51a5a63944
2 changed files with 16 additions and 15 deletions

View File

@@ -103,7 +103,7 @@ class BaseRenderWorker(Base):
self.watchdog_timeout = 120 self.watchdog_timeout = 120
def __repr__(self): def __repr__(self):
return f"<{self.__class__.__name__}|{self.id}|{self.name}|{self.status}|{self.input_path}>" return f"<Job id:{self.id} p{self.priority} {self.renderer}-{self.renderer_version} '{self.name}' status:{self.status.value}>"
@property @property
def total_frames(self): def total_frames(self):

View File

@@ -1,9 +1,8 @@
import logging import logging
import os import os
import threading
import time
from datetime import datetime from datetime import datetime
from pubsub import pub
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.exc import DetachedInstanceError from sqlalchemy.orm.exc import DetachedInstanceError
@@ -39,21 +38,20 @@ class RenderQueue:
@classmethod @classmethod
def start(cls): def start(cls):
logger.debug("Starting render queue updates")
cls.is_running = True cls.is_running = True
cls.__eval_thread = threading.Thread(target=cls.__eval_loop, daemon=True) cls.evaluate_queue()
cls.__eval_thread.start()
@classmethod @classmethod
def __eval_loop(cls): def __local_job_status_changed(cls, job_id, old_status, new_status):
while cls.is_running: render_job = RenderQueue.job_with_id(job_id, none_ok=True)
try: if render_job and cls.is_running: # ignore changes from render jobs not in the queue yet
logger.debug(f"RenderQueue detected job {job_id} has changed from {old_status} -> {new_status}")
RenderQueue.evaluate_queue() RenderQueue.evaluate_queue()
except Exception as e:
logger.exception(f"Uncaught error while evaluating queue: {e}")
time.sleep(cls.evaluation_inverval)
@classmethod @classmethod
def stop(cls): def stop(cls):
logger.debug("Stopping render queue updates")
cls.is_running = False cls.is_running = False
# -------------------------------------------- # --------------------------------------------
@@ -62,12 +60,14 @@ class RenderQueue:
@classmethod @classmethod
def add_to_render_queue(cls, render_job, force_start=False): def add_to_render_queue(cls, render_job, force_start=False):
logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job)) logger.info(f"Adding job to render queue: {render_job}")
cls.job_queue.append(render_job) cls.job_queue.append(render_job)
if force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED): if cls.is_running and force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED):
cls.start_job(render_job) cls.start_job(render_job)
cls.session.add(render_job) cls.session.add(render_job)
cls.save_state() cls.save_state()
if cls.is_running:
cls.evaluate_queue()
@classmethod @classmethod
def all_jobs(cls): def all_jobs(cls):
@@ -113,6 +113,7 @@ class RenderQueue:
cls.session = sessionmaker(bind=cls.engine)() cls.session = sessionmaker(bind=cls.engine)()
from src.engines.core.base_worker import BaseRenderWorker from src.engines.core.base_worker import BaseRenderWorker
cls.job_queue = cls.session.query(BaseRenderWorker).all() cls.job_queue = cls.session.query(BaseRenderWorker).all()
pub.subscribe(cls.__local_job_status_changed, 'status_change')
@classmethod @classmethod
def save_state(cls): def save_state(cls):
@@ -157,7 +158,7 @@ class RenderQueue:
@classmethod @classmethod
def start_job(cls, job): def start_job(cls, job):
logger.info(f'Starting job: {job} - Priority {job.priority}') logger.info(f'Starting job: {job}')
job.start() job.start()
cls.save_state() cls.save_state()