Files
Zordon/src/render_queue.py
T

300 lines
10 KiB
Python
Executable File

import logging
import threading
from collections import Counter
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
from pubsub import pub
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.orm.exc import DetachedInstanceError
from src.engines.core.base_worker import Base, BaseRenderWorker
from src.utilities.status_utils import RenderStatus
logger = logging.getLogger()
class JobNotFoundError(Exception):
def __init__(self, job_id: str, *args: Any) -> None:
super().__init__(args)
self.job_id = job_id
def __str__(self) -> str:
return f"Cannot find job with ID: {self.job_id}"
class RenderQueue:
_default_instance: Optional['RenderQueue'] = None
@classmethod
def _sync_class(cls) -> None:
if cls._default_instance is not None:
pass # no class-level attributes to sync
def __init__(self) -> None:
self.engine: Optional[create_engine] = None
self.session: Optional[Session] = None
self.job_queue: List[BaseRenderWorker] = []
self.maximum_renderer_instances: Dict[str, int] = {'blender': 1, 'aerender': 1, 'ffmpeg': 4}
self.last_saved_counts: Dict[str, int] = {}
self.is_running: bool = False
self._lock = threading.Lock()
# --------------------------------------------
# Render Queue Evaluation:
# --------------------------------------------
def _start(self) -> None:
logger.debug("Starting render queue updates")
self.is_running = True
self.evaluate_queue()
def _evaluate_queue(self) -> None:
try:
not_started = self.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True)
for job in not_started:
if self.is_available_for_job(job.engine_name, job.priority):
self.start_job(job)
scheduled = self.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True)
for job in scheduled:
if job.scheduled_start <= datetime.now():
logger.debug(f"Starting scheduled job: {job}")
self.start_job(job)
if self.last_saved_counts != self.job_counts():
self.save_state()
except DetachedInstanceError:
pass
def _local_job_status_changed(self, job_id: str, old_status: str, new_status: str) -> None:
render_job = self.job_with_id(job_id, none_ok=True)
if render_job and self.is_running:
logger.debug(f"RenderQueue detected job {job_id} has changed from {old_status} -> {new_status}")
self.evaluate_queue()
def _stop(self) -> None:
logger.debug("Stopping render queue updates")
self.is_running = False
# --------------------------------------------
# Fetch Jobs:
# --------------------------------------------
def _all_jobs(self) -> List[BaseRenderWorker]:
return self.job_queue
def _running_jobs(self) -> List[BaseRenderWorker]:
return self.jobs_with_status(RenderStatus.RUNNING)
def _pending_jobs(self) -> List[BaseRenderWorker]:
pending = self.jobs_with_status(RenderStatus.NOT_STARTED)
pending.extend(self.jobs_with_status(RenderStatus.SCHEDULED))
return pending
def _jobs_with_status(self, status: RenderStatus, priority_sorted: bool = False) -> List[BaseRenderWorker]:
found_jobs = [x for x in self.all_jobs() if x.status == status]
if priority_sorted:
found_jobs = sorted(found_jobs, key=lambda a: a.priority, reverse=False)
return found_jobs
def _job_with_id(self, job_id: str, none_ok: bool = False) -> Optional[BaseRenderWorker]:
found_job = next((x for x in self.all_jobs() if x.id == job_id), None)
if not found_job and not none_ok:
raise JobNotFoundError(job_id)
return found_job
def _job_counts(self) -> Dict[str, int]:
counts = Counter(x.status for x in self.all_jobs())
return {s.value: counts.get(s, 0) for s in RenderStatus}
# --------------------------------------------
# Startup / Shutdown:
# --------------------------------------------
def _load_state(self, database_directory: Path) -> None:
self.engine = create_engine(f"sqlite:///{database_directory / 'database.db'}")
Base.metadata.create_all(self.engine)
self.session = sessionmaker(bind=self.engine)()
from src.engines.core.base_worker import BaseRenderWorker
self.job_queue = self.session.query(BaseRenderWorker).all()
pub.subscribe(self._local_job_status_changed, 'status_change')
def _save_state(self) -> None:
if self.session:
self.session.commit()
def _prepare_for_shutdown(self) -> None:
logger.debug("Closing session")
self.stop()
running_jobs = self.jobs_with_status(RenderStatus.RUNNING)
for job in running_jobs:
self.cancel_job(job)
self.save_state()
if self.session:
self.session.close()
# --------------------------------------------
# Renderer Availability:
# --------------------------------------------
def renderer_instances(self) -> Counter:
all_instances = [x.engine_name for x in self.running_jobs()]
return Counter(all_instances)
def _is_available_for_job(self, renderer: str, priority: int = 2) -> bool:
instances = self.renderer_instances()
higher_priority_jobs = [x for x in self.running_jobs() if x.priority < priority]
max_allowed_instances = self.maximum_renderer_instances.get(renderer, 1)
maxed_out_instances = renderer in instances and instances[renderer] >= max_allowed_instances
return not maxed_out_instances and not higher_priority_jobs
# --------------------------------------------
# Job Lifecycle Management:
# --------------------------------------------
def _add_to_render_queue(self, render_job: BaseRenderWorker, force_start: bool = False) -> None:
logger.info(f"Adding job to render queue: {render_job}")
with self._lock:
self.job_queue.append(render_job)
if force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED):
self.start_job(render_job)
self.session.add(render_job)
self.save_state()
if self.is_running:
self.evaluate_queue()
def _start_job(self, job: BaseRenderWorker) -> None:
logger.info(f'Starting job: {job}')
job.start()
self.save_state()
def _cancel_job(self, job: BaseRenderWorker) -> bool:
logger.info(f'Cancelling job: {job}')
job.stop()
return job.status == RenderStatus.CANCELLED
def _delete_job(self, job: BaseRenderWorker) -> bool:
logger.info(f"Deleting job: {job}")
with self._lock:
job.stop()
self.job_queue.remove(job)
self.session.delete(job)
self.save_state()
return True
# --------------------------------------------
# Miscellaneous:
# --------------------------------------------
def _clear_history(self) -> None:
for job in list(self.all_jobs()):
if job.status in (RenderStatus.CANCELLED, RenderStatus.COMPLETED, RenderStatus.ERROR):
self.delete_job(job)
self.save_state()
# --- Forwarders for backward compatibility ---
@classmethod
def start(cls):
if cls._default_instance is not None:
cls._default_instance._start()
@classmethod
def evaluate_queue(cls):
if cls._default_instance is not None:
cls._default_instance._evaluate_queue()
@classmethod
def stop(cls):
if cls._default_instance is not None:
cls._default_instance._stop()
@classmethod
def all_jobs(cls):
if cls._default_instance is not None:
return cls._default_instance.job_queue
return []
@classmethod
def running_jobs(cls):
if cls._default_instance is not None:
return cls._default_instance._running_jobs()
return []
@classmethod
def pending_jobs(cls):
if cls._default_instance is not None:
return cls._default_instance._pending_jobs()
return []
@classmethod
def jobs_with_status(cls, status, priority_sorted=False):
if cls._default_instance is not None:
return cls._default_instance._jobs_with_status(status, priority_sorted)
return []
@classmethod
def job_with_id(cls, job_id, none_ok=False):
if cls._default_instance is not None:
return cls._default_instance._job_with_id(job_id, none_ok)
if not none_ok:
raise JobNotFoundError(job_id)
return None
@classmethod
def job_counts(cls):
if cls._default_instance is not None:
return cls._default_instance._job_counts()
return {}
@classmethod
def load_state(cls, database_directory):
if cls._default_instance is not None:
cls._default_instance._load_state(database_directory)
@classmethod
def save_state(cls):
if cls._default_instance is not None:
cls._default_instance._save_state()
@classmethod
def prepare_for_shutdown(cls):
if cls._default_instance is not None:
cls._default_instance._prepare_for_shutdown()
@classmethod
def is_available_for_job(cls, renderer, priority=2):
if cls._default_instance is not None:
return cls._default_instance._is_available_for_job(renderer, priority)
return True
@classmethod
def add_to_render_queue(cls, render_job, force_start=False):
if cls._default_instance is not None:
cls._default_instance._add_to_render_queue(render_job, force_start)
@classmethod
def start_job(cls, job):
if cls._default_instance is not None:
cls._default_instance._start_job(job)
@classmethod
def cancel_job(cls, job):
if cls._default_instance is not None:
return cls._default_instance._cancel_job(job)
return False
@classmethod
def delete_job(cls, job):
if cls._default_instance is not None:
return cls._default_instance._delete_job(job)
return False
@classmethod
def clear_history(cls):
if cls._default_instance is not None:
cls._default_instance._clear_history()