Major file reorganization (#26)

* Major file reorganization

* Rearrange imports

* Fix default log level
This commit is contained in:
2023-06-30 21:24:40 -05:00
committed by GitHub
parent 34fbdaa4d9
commit a475aa999a
46 changed files with 117 additions and 128 deletions

160
src/render_queue.py Executable file
View File

@@ -0,0 +1,160 @@
import logging
from datetime import datetime
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from src.utilities.status_utils import RenderStatus
from src.worker_factory import RenderWorkerFactory
from src.workers.base_worker import Base
logger = logging.getLogger()
class JobNotFoundError(Exception):
def __init__(self, job_id, *args):
super().__init__(args)
self.job_id = job_id
class RenderQueue:
engine = create_engine('sqlite:///database.db')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
job_queue = []
maximum_renderer_instances = {'blender': 1, 'aerender': 1, 'ffmpeg': 4}
last_saved_counts = {}
def __init__(self):
pass
@classmethod
def start_queue(cls):
cls.load_state()
@classmethod
def job_status_change(cls, job_id, status):
logger.debug(f"Job status changed: {job_id} -> {status}")
@classmethod
def add_to_render_queue(cls, render_job, force_start=False):
logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job))
cls.job_queue.append(render_job)
if force_start:
cls.start_job(render_job)
cls.session.add(render_job)
cls.save_state()
@classmethod
def all_jobs(cls):
return cls.job_queue
@classmethod
def running_jobs(cls):
return cls.jobs_with_status(RenderStatus.RUNNING)
@classmethod
def pending_jobs(cls):
pending_jobs = cls.jobs_with_status(RenderStatus.NOT_STARTED)
pending_jobs.extend(cls.jobs_with_status(RenderStatus.SCHEDULED))
return pending_jobs
@classmethod
def jobs_with_status(cls, status, priority_sorted=False):
found_jobs = [x for x in cls.all_jobs() if x.status == status]
if priority_sorted:
found_jobs = sorted(found_jobs, key=lambda a: a.priority, reverse=False)
return found_jobs
@classmethod
def job_with_id(cls, job_id, none_ok=False):
found_job = next((x for x in cls.all_jobs() if x.id == job_id), None)
if not found_job and not none_ok:
raise JobNotFoundError(job_id)
return found_job
@classmethod
def clear_history(cls):
to_remove = [x for x in cls.all_jobs() if x.status in [RenderStatus.CANCELLED,
RenderStatus.COMPLETED, RenderStatus.ERROR]]
for job_to_remove in to_remove:
cls.delete_job(job_to_remove)
cls.save_state()
@classmethod
def load_state(cls):
from src.workers.base_worker import BaseRenderWorker
cls.job_queue = cls.session.query(BaseRenderWorker).all()
@classmethod
def save_state(cls):
cls.session.commit()
@classmethod
def prepare_for_shutdown(cls):
running_jobs = cls.jobs_with_status(RenderStatus.RUNNING) # cancel all running jobs
for job in running_jobs:
cls.cancel_job(job)
cls.save_state()
@classmethod
def is_available_for_job(cls, renderer, priority=2):
if not RenderWorkerFactory.class_for_name(renderer).engine.renderer_path():
return False
instances = cls.renderer_instances()
higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < priority]
max_allowed_instances = cls.maximum_renderer_instances.get(renderer, 1)
maxed_out_instances = renderer in instances.keys() and instances[renderer] >= max_allowed_instances
return not maxed_out_instances and not higher_priority_jobs
@classmethod
def evaluate_queue(cls):
not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True)
for job in not_started:
if cls.is_available_for_job(job.renderer, job.priority):
cls.start_job(job)
scheduled = cls.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True)
for job in scheduled:
if job.scheduled_start <= datetime.now():
logger.debug(f"Starting scheduled job: {job}")
cls.start_job(job)
if cls.last_saved_counts != cls.job_counts():
cls.save_state()
@classmethod
def start_job(cls, job):
logger.info(f'Starting render: {job.name} - Priority {job.priority}')
job.start()
cls.save_state()
@classmethod
def cancel_job(cls, job):
logger.info(f'Cancelling job ID: {job.id}')
job.stop()
return job.status == RenderStatus.CANCELLED
@classmethod
def delete_job(cls, job):
logger.info(f"Deleting job ID: {job.id}")
job.stop()
cls.job_queue.remove(job)
cls.session.delete(job)
cls.save_state()
return True
@classmethod
def renderer_instances(cls):
from collections import Counter
all_instances = [x.renderer for x in cls.running_jobs()]
return Counter(all_instances)
@classmethod
def job_counts(cls):
job_counts = {}
for job_status in RenderStatus:
job_counts[job_status.value] = len(cls.jobs_with_status(job_status))
return job_counts