Files
Zordon/lib/render_queue.py
Brett Williams 76e413c18d Assign frame ranges to servers based on their CPU count (#19)
* Expose renderer availability in status api

* Remove redundant is_available_for_job API call

* New server split logic by cpu and moved to server_helper.py

* Remove old dead code

* Add RenderStatus.WAITING to proxy categories
2023-06-16 00:04:02 -05:00

143 lines
4.6 KiB
Python
Executable File

import logging
from datetime import datetime
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from .workers.base_worker import RenderStatus, BaseRenderWorker, Base
from .workers.worker_factory import RenderWorkerFactory
logger = logging.getLogger()
class JobNotFoundError(Exception):
def __init__(self, job_id, *args):
super().__init__(args)
self.job_id = job_id
class RenderQueue:
engine = create_engine('sqlite:///database.db')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
job_queue = []
maximum_renderer_instances = {'blender': 1, 'aerender': 1, 'ffmpeg': 4}
last_saved_counts = {}
def __init__(self):
pass
@classmethod
def add_to_render_queue(cls, render_job, force_start=False):
logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job))
cls.job_queue.append(render_job)
if force_start:
cls.start_job(render_job)
cls.session.add(render_job)
cls.save_state()
@classmethod
def all_jobs(cls):
return cls.job_queue
@classmethod
def running_jobs(cls):
return cls.jobs_with_status(RenderStatus.RUNNING)
@classmethod
def pending_jobs(cls):
pending_jobs = cls.jobs_with_status(RenderStatus.NOT_STARTED)
pending_jobs.extend(cls.jobs_with_status(RenderStatus.SCHEDULED))
return pending_jobs
@classmethod
def jobs_with_status(cls, status, priority_sorted=False):
found_jobs = [x for x in cls.all_jobs() if x.status == status]
if priority_sorted:
found_jobs = sorted(found_jobs, key=lambda a: a.priority, reverse=False)
return found_jobs
@classmethod
def job_with_id(cls, job_id, none_ok=False):
found_job = next((x for x in cls.all_jobs() if x.id == job_id), None)
if not found_job and not none_ok:
raise JobNotFoundError(job_id)
return found_job
@classmethod
def clear_history(cls):
to_remove = [x for x in cls.all_jobs() if x.status in [RenderStatus.CANCELLED,
RenderStatus.COMPLETED, RenderStatus.ERROR]]
for job_to_remove in to_remove:
cls.delete_job(job_to_remove)
cls.save_state()
@classmethod
def load_state(cls):
cls.job_queue = cls.session.query(BaseRenderWorker).all()
@classmethod
def save_state(cls):
cls.session.commit()
@classmethod
def is_available_for_job(cls, renderer, priority=2):
if not RenderWorkerFactory.class_for_name(renderer).engine.renderer_path():
return False
instances = cls.renderer_instances()
higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < priority]
max_renderers = renderer in instances.keys() and instances[renderer] >= cls.maximum_renderer_instances.get(renderer, 1)
return not max_renderers and not higher_priority_jobs
@classmethod
def evaluate_queue(cls):
not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True)
for job in not_started:
if cls.is_available_for_job(job.renderer, job.priority):
cls.start_job(job)
scheduled = cls.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True)
for job in scheduled:
if job.scheduled_start <= datetime.now():
logger.debug(f"Starting scheduled job: {job}")
cls.start_job(job)
if cls.last_saved_counts != cls.job_counts():
cls.save_state()
@classmethod
def start_job(cls, job):
logger.info(f'Starting render: {job.name} - Priority {job.priority}')
job.start()
cls.save_state()
@classmethod
def cancel_job(cls, job):
logger.info(f'Cancelling job ID: {job.id}')
job.stop()
return job.status == RenderStatus.CANCELLED
@classmethod
def delete_job(cls, job):
logger.info(f"Deleting job ID: {job.id}")
job.stop()
cls.job_queue.remove(job)
cls.session.delete(job)
cls.save_state()
return True
@classmethod
def renderer_instances(cls):
from collections import Counter
all_instances = [x.renderer for x in cls.running_jobs()]
return Counter(all_instances)
@classmethod
def job_counts(cls):
job_counts = {}
for job_status in RenderStatus:
job_counts[job_status.value] = len(cls.jobs_with_status(job_status))
return job_counts