import logging import os import threading import time from datetime import datetime from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.orm.exc import DetachedInstanceError from src.engines.core.base_worker import Base from src.utilities.status_utils import RenderStatus logger = logging.getLogger() class JobNotFoundError(Exception): def __init__(self, job_id, *args): super().__init__(args) self.job_id = job_id def __str__(self): return f"Cannot find job with ID: {self.job_id}" class RenderQueue: engine = None session = None job_queue = [] maximum_renderer_instances = {'blender': 1, 'aerender': 1, 'ffmpeg': 4} last_saved_counts = {} is_running = False __eval_thread = None evaluation_inverval = 1 # -------------------------------------------- # Start / Stop Background Updates # -------------------------------------------- @classmethod def start(cls): cls.is_running = True cls.__eval_thread = threading.Thread(target=cls.__eval_loop, daemon=True) cls.__eval_thread.start() @classmethod def __eval_loop(cls): while cls.is_running: try: RenderQueue.evaluate_queue() except Exception as e: logger.exception(f"Uncaught error while evaluating queue: {e}") time.sleep(cls.evaluation_inverval) @classmethod def stop(cls): cls.is_running = False # -------------------------------------------- # Queue Management # -------------------------------------------- @classmethod def add_to_render_queue(cls, render_job, force_start=False): logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job)) cls.job_queue.append(render_job) if force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED): cls.start_job(render_job) cls.session.add(render_job) cls.save_state() @classmethod def all_jobs(cls): return cls.job_queue @classmethod def running_jobs(cls): return cls.jobs_with_status(RenderStatus.RUNNING) @classmethod def pending_jobs(cls): pending_jobs = cls.jobs_with_status(RenderStatus.NOT_STARTED) pending_jobs.extend(cls.jobs_with_status(RenderStatus.SCHEDULED)) return pending_jobs @classmethod def jobs_with_status(cls, status, priority_sorted=False): found_jobs = [x for x in cls.all_jobs() if x.status == status] if priority_sorted: found_jobs = sorted(found_jobs, key=lambda a: a.priority, reverse=False) return found_jobs @classmethod def job_with_id(cls, job_id, none_ok=False): found_job = next((x for x in cls.all_jobs() if x.id == job_id), None) if not found_job and not none_ok: raise JobNotFoundError(job_id) return found_job @classmethod def clear_history(cls): to_remove = [x for x in cls.all_jobs() if x.status in [RenderStatus.CANCELLED, RenderStatus.COMPLETED, RenderStatus.ERROR]] for job_to_remove in to_remove: cls.delete_job(job_to_remove) cls.save_state() @classmethod def load_state(cls, database_directory): if not cls.engine: cls.engine = create_engine(f"sqlite:///{os.path.join(database_directory, 'database.db')}") Base.metadata.create_all(cls.engine) cls.session = sessionmaker(bind=cls.engine)() from src.engines.core.base_worker import BaseRenderWorker cls.job_queue = cls.session.query(BaseRenderWorker).all() @classmethod def save_state(cls): cls.session.commit() @classmethod def prepare_for_shutdown(cls): logger.debug("Closing session") cls.stop() running_jobs = cls.jobs_with_status(RenderStatus.RUNNING) # cancel all running jobs [cls.cancel_job(job) for job in running_jobs] cls.save_state() cls.session.close() @classmethod def is_available_for_job(cls, renderer, priority=2): instances = cls.renderer_instances() higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < priority] max_allowed_instances = cls.maximum_renderer_instances.get(renderer, 1) maxed_out_instances = renderer in instances.keys() and instances[renderer] >= max_allowed_instances return not maxed_out_instances and not higher_priority_jobs @classmethod def evaluate_queue(cls): try: not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True) for job in not_started: if cls.is_available_for_job(job.renderer, job.priority): cls.start_job(job) scheduled = cls.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True) for job in scheduled: if job.scheduled_start <= datetime.now(): logger.debug(f"Starting scheduled job: {job}") cls.start_job(job) if cls.last_saved_counts != cls.job_counts(): cls.save_state() except DetachedInstanceError: pass @classmethod def start_job(cls, job): logger.info(f'Starting job: {job} - Priority {job.priority}') job.start() cls.save_state() @classmethod def cancel_job(cls, job): logger.info(f'Cancelling job: {job}') job.stop() return job.status == RenderStatus.CANCELLED @classmethod def delete_job(cls, job): logger.info(f"Deleting job: {job}") job.stop() cls.job_queue.remove(job) cls.session.delete(job) cls.save_state() return True @classmethod def renderer_instances(cls): from collections import Counter all_instances = [x.renderer for x in cls.running_jobs()] return Counter(all_instances) @classmethod def job_counts(cls): job_counts = {} for job_status in RenderStatus: job_counts[job_status.value] = len(cls.jobs_with_status(job_status)) return job_counts