import logging import threading from collections import Counter from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional from pubsub import pub from sqlalchemy import create_engine from sqlalchemy.orm import Session, sessionmaker from sqlalchemy.orm.exc import DetachedInstanceError from src.engines.core.base_worker import Base, BaseRenderWorker from src.utilities.status_utils import RenderStatus logger = logging.getLogger() class JobNotFoundError(Exception): def __init__(self, job_id: str, *args: Any) -> None: super().__init__(args) self.job_id = job_id def __str__(self) -> str: return f"Cannot find job with ID: {self.job_id}" class RenderQueue: _default_instance: Optional['RenderQueue'] = None @classmethod def _sync_class(cls) -> None: if cls._default_instance is not None: pass # no class-level attributes to sync def __init__(self) -> None: self.engine: Optional[create_engine] = None self.session: Optional[Session] = None self.job_queue: List[BaseRenderWorker] = [] self.maximum_renderer_instances: Dict[str, int] = {'blender': 1, 'aerender': 1, 'ffmpeg': 4} self.last_saved_counts: Dict[str, int] = {} self.is_running: bool = False self._lock = threading.Lock() # -------------------------------------------- # Render Queue Evaluation: # -------------------------------------------- def _start(self) -> None: logger.debug("Starting render queue updates") self.is_running = True self.evaluate_queue() def _evaluate_queue(self) -> None: try: not_started = self.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True) for job in not_started: if self.is_available_for_job(job.engine_name, job.priority): self.start_job(job) scheduled = self.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True) for job in scheduled: if job.scheduled_start <= datetime.now(): logger.debug(f"Starting scheduled job: {job}") self.start_job(job) if self.last_saved_counts != self.job_counts(): self.save_state() except DetachedInstanceError: pass def _local_job_status_changed(self, job_id: str, old_status: str, new_status: str) -> None: render_job = self.job_with_id(job_id, none_ok=True) if render_job and self.is_running: logger.debug(f"RenderQueue detected job {job_id} has changed from {old_status} -> {new_status}") self.evaluate_queue() def _stop(self) -> None: logger.debug("Stopping render queue updates") self.is_running = False # -------------------------------------------- # Fetch Jobs: # -------------------------------------------- def _all_jobs(self) -> List[BaseRenderWorker]: return self.job_queue def _running_jobs(self) -> List[BaseRenderWorker]: return self.jobs_with_status(RenderStatus.RUNNING) def _pending_jobs(self) -> List[BaseRenderWorker]: pending = self.jobs_with_status(RenderStatus.NOT_STARTED) pending.extend(self.jobs_with_status(RenderStatus.SCHEDULED)) return pending def _jobs_with_status(self, status: RenderStatus, priority_sorted: bool = False) -> List[BaseRenderWorker]: found_jobs = [x for x in self.all_jobs() if x.status == status] if priority_sorted: found_jobs = sorted(found_jobs, key=lambda a: a.priority, reverse=False) return found_jobs def _job_with_id(self, job_id: str, none_ok: bool = False) -> Optional[BaseRenderWorker]: found_job = next((x for x in self.all_jobs() if x.id == job_id), None) if not found_job and not none_ok: raise JobNotFoundError(job_id) return found_job def _job_counts(self) -> Dict[str, int]: counts = Counter(x.status for x in self.all_jobs()) return {s.value: counts.get(s, 0) for s in RenderStatus} # -------------------------------------------- # Startup / Shutdown: # -------------------------------------------- def _load_state(self, database_directory: Path) -> None: self.engine = create_engine(f"sqlite:///{database_directory / 'database.db'}") Base.metadata.create_all(self.engine) self.session = sessionmaker(bind=self.engine)() from src.engines.core.base_worker import BaseRenderWorker self.job_queue = self.session.query(BaseRenderWorker).all() pub.subscribe(self._local_job_status_changed, 'status_change') def _save_state(self) -> None: if self.session: self.session.commit() def _prepare_for_shutdown(self) -> None: logger.debug("Closing session") self.stop() running_jobs = self.jobs_with_status(RenderStatus.RUNNING) for job in running_jobs: self.cancel_job(job) self.save_state() if self.session: self.session.close() # -------------------------------------------- # Renderer Availability: # -------------------------------------------- def renderer_instances(self) -> Counter: all_instances = [x.engine_name for x in self.running_jobs()] return Counter(all_instances) def _is_available_for_job(self, renderer: str, priority: int = 2) -> bool: instances = self.renderer_instances() higher_priority_jobs = [x for x in self.running_jobs() if x.priority < priority] max_allowed_instances = self.maximum_renderer_instances.get(renderer, 1) maxed_out_instances = renderer in instances and instances[renderer] >= max_allowed_instances return not maxed_out_instances and not higher_priority_jobs # -------------------------------------------- # Job Lifecycle Management: # -------------------------------------------- def _add_to_render_queue(self, render_job: BaseRenderWorker, force_start: bool = False) -> None: logger.info(f"Adding job to render queue: {render_job}") with self._lock: self.job_queue.append(render_job) if force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED): self.start_job(render_job) self.session.add(render_job) self.save_state() if self.is_running: self.evaluate_queue() def _start_job(self, job: BaseRenderWorker) -> None: logger.info(f'Starting job: {job}') job.start() self.save_state() def _cancel_job(self, job: BaseRenderWorker) -> bool: logger.info(f'Cancelling job: {job}') job.stop() return job.status == RenderStatus.CANCELLED def _delete_job(self, job: BaseRenderWorker) -> bool: logger.info(f"Deleting job: {job}") with self._lock: job.stop() self.job_queue.remove(job) self.session.delete(job) self.save_state() return True # -------------------------------------------- # Miscellaneous: # -------------------------------------------- def _clear_history(self) -> None: for job in list(self.all_jobs()): if job.status in (RenderStatus.CANCELLED, RenderStatus.COMPLETED, RenderStatus.ERROR): self.delete_job(job) self.save_state() # --- Forwarders for backward compatibility --- @classmethod def start(cls): if cls._default_instance is not None: cls._default_instance._start() @classmethod def evaluate_queue(cls): if cls._default_instance is not None: cls._default_instance._evaluate_queue() @classmethod def stop(cls): if cls._default_instance is not None: cls._default_instance._stop() @classmethod def all_jobs(cls): if cls._default_instance is not None: return cls._default_instance.job_queue return [] @classmethod def running_jobs(cls): if cls._default_instance is not None: return cls._default_instance._running_jobs() return [] @classmethod def pending_jobs(cls): if cls._default_instance is not None: return cls._default_instance._pending_jobs() return [] @classmethod def jobs_with_status(cls, status, priority_sorted=False): if cls._default_instance is not None: return cls._default_instance._jobs_with_status(status, priority_sorted) return [] @classmethod def job_with_id(cls, job_id, none_ok=False): if cls._default_instance is not None: return cls._default_instance._job_with_id(job_id, none_ok) if not none_ok: raise JobNotFoundError(job_id) return None @classmethod def job_counts(cls): if cls._default_instance is not None: return cls._default_instance._job_counts() return {} @classmethod def load_state(cls, database_directory): if cls._default_instance is not None: cls._default_instance._load_state(database_directory) @classmethod def save_state(cls): if cls._default_instance is not None: cls._default_instance._save_state() @classmethod def prepare_for_shutdown(cls): if cls._default_instance is not None: cls._default_instance._prepare_for_shutdown() @classmethod def is_available_for_job(cls, renderer, priority=2): if cls._default_instance is not None: return cls._default_instance._is_available_for_job(renderer, priority) return True @classmethod def add_to_render_queue(cls, render_job, force_start=False): if cls._default_instance is not None: cls._default_instance._add_to_render_queue(render_job, force_start) @classmethod def start_job(cls, job): if cls._default_instance is not None: cls._default_instance._start_job(job) @classmethod def cancel_job(cls, job): if cls._default_instance is not None: return cls._default_instance._cancel_job(job) return False @classmethod def delete_job(cls, job): if cls._default_instance is not None: return cls._default_instance._delete_job(job) return False @classmethod def clear_history(cls): if cls._default_instance is not None: cls._default_instance._clear_history()