Reorganized render_queue.py

This commit is contained in:
Brett Williams
2024-08-21 01:12:12 -05:00
parent eaa796221d
commit 9951915bbe

View File

@@ -29,18 +29,37 @@ class RenderQueue:
maximum_renderer_instances = {'blender': 1, 'aerender': 1, 'ffmpeg': 4} maximum_renderer_instances = {'blender': 1, 'aerender': 1, 'ffmpeg': 4}
last_saved_counts = {} last_saved_counts = {}
is_running = False is_running = False
__eval_thread = None
# -------------------------------------------- # --------------------------------------------
# Start / Stop Background Updates # Render Queue Evaluation:
# -------------------------------------------- # --------------------------------------------
@classmethod @classmethod
def start(cls): def start(cls):
"""Start evaluating the render queue"""
logger.debug("Starting render queue updates") logger.debug("Starting render queue updates")
cls.is_running = True cls.is_running = True
cls.evaluate_queue() cls.evaluate_queue()
@classmethod
def evaluate_queue(cls):
try:
not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True)
for job in not_started:
if cls.is_available_for_job(job.renderer, job.priority):
cls.start_job(job)
scheduled = cls.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True)
for job in scheduled:
if job.scheduled_start <= datetime.now():
logger.debug(f"Starting scheduled job: {job}")
cls.start_job(job)
if cls.last_saved_counts != cls.job_counts():
cls.save_state()
except DetachedInstanceError:
pass
@classmethod @classmethod
def __local_job_status_changed(cls, job_id, old_status, new_status): def __local_job_status_changed(cls, job_id, old_status, new_status):
render_job = RenderQueue.job_with_id(job_id, none_ok=True) render_job = RenderQueue.job_with_id(job_id, none_ok=True)
@@ -54,20 +73,9 @@ class RenderQueue:
cls.is_running = False cls.is_running = False
# -------------------------------------------- # --------------------------------------------
# Queue Management # Fetch Jobs:
# -------------------------------------------- # --------------------------------------------
@classmethod
def add_to_render_queue(cls, render_job, force_start=False):
logger.info(f"Adding job to render queue: {render_job}")
cls.job_queue.append(render_job)
if cls.is_running and force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED):
cls.start_job(render_job)
cls.session.add(render_job)
cls.save_state()
if cls.is_running:
cls.evaluate_queue()
@classmethod @classmethod
def all_jobs(cls): def all_jobs(cls):
return cls.job_queue return cls.job_queue
@@ -97,12 +105,15 @@ class RenderQueue:
return found_job return found_job
@classmethod @classmethod
def clear_history(cls): def job_counts(cls):
to_remove = [x for x in cls.all_jobs() if x.status in [RenderStatus.CANCELLED, job_counts = {}
RenderStatus.COMPLETED, RenderStatus.ERROR]] for job_status in RenderStatus:
for job_to_remove in to_remove: job_counts[job_status.value] = len(cls.jobs_with_status(job_status))
cls.delete_job(job_to_remove) return job_counts
cls.save_state()
# --------------------------------------------
# Startup / Shutdown:
# --------------------------------------------
@classmethod @classmethod
def load_state(cls, database_directory): def load_state(cls, database_directory):
@@ -127,6 +138,16 @@ class RenderQueue:
cls.save_state() cls.save_state()
cls.session.close() cls.session.close()
# --------------------------------------------
# Renderer Availability:
# --------------------------------------------
@classmethod
def renderer_instances(cls):
from collections import Counter
all_instances = [x.renderer for x in cls.running_jobs()]
return Counter(all_instances)
@classmethod @classmethod
def is_available_for_job(cls, renderer, priority=2): def is_available_for_job(cls, renderer, priority=2):
@@ -136,24 +157,20 @@ class RenderQueue:
maxed_out_instances = renderer in instances.keys() and instances[renderer] >= max_allowed_instances maxed_out_instances = renderer in instances.keys() and instances[renderer] >= max_allowed_instances
return not maxed_out_instances and not higher_priority_jobs return not maxed_out_instances and not higher_priority_jobs
# --------------------------------------------
# Job Lifecycle Management:
# --------------------------------------------
@classmethod @classmethod
def evaluate_queue(cls): def add_to_render_queue(cls, render_job, force_start=False):
try: logger.info(f"Adding job to render queue: {render_job}")
not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True) cls.job_queue.append(render_job)
for job in not_started: if cls.is_running and force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED):
if cls.is_available_for_job(job.renderer, job.priority): cls.start_job(render_job)
cls.start_job(job) cls.session.add(render_job)
cls.save_state()
scheduled = cls.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True) if cls.is_running:
for job in scheduled: cls.evaluate_queue()
if job.scheduled_start <= datetime.now():
logger.debug(f"Starting scheduled job: {job}")
cls.start_job(job)
if cls.last_saved_counts != cls.job_counts():
cls.save_state()
except DetachedInstanceError:
pass
@classmethod @classmethod
def start_job(cls, job): def start_job(cls, job):
@@ -176,15 +193,14 @@ class RenderQueue:
cls.save_state() cls.save_state()
return True return True
@classmethod # --------------------------------------------
def renderer_instances(cls): # Miscellaneous:
from collections import Counter # --------------------------------------------
all_instances = [x.renderer for x in cls.running_jobs()]
return Counter(all_instances)
@classmethod @classmethod
def job_counts(cls): def clear_history(cls):
job_counts = {} to_remove = [x for x in cls.all_jobs() if x.status in [RenderStatus.CANCELLED,
for job_status in RenderStatus: RenderStatus.COMPLETED, RenderStatus.ERROR]]
job_counts[job_status.value] = len(cls.jobs_with_status(job_status)) for job_to_remove in to_remove:
return job_counts cls.delete_job(job_to_remove)
cls.save_state()