From 9951915bbef362622319ff640b4a71505193ecf0 Mon Sep 17 00:00:00 2001 From: Brett Williams Date: Wed, 21 Aug 2024 01:12:12 -0500 Subject: [PATCH] Reorganized render_queue.py --- src/render_queue.py | 110 +++++++++++++++++++++++++------------------- 1 file changed, 63 insertions(+), 47 deletions(-) diff --git a/src/render_queue.py b/src/render_queue.py index c81a72d..c6df222 100755 --- a/src/render_queue.py +++ b/src/render_queue.py @@ -29,18 +29,37 @@ class RenderQueue: maximum_renderer_instances = {'blender': 1, 'aerender': 1, 'ffmpeg': 4} last_saved_counts = {} is_running = False - __eval_thread = None # -------------------------------------------- - # Start / Stop Background Updates + # Render Queue Evaluation: # -------------------------------------------- @classmethod def start(cls): + """Start evaluating the render queue""" logger.debug("Starting render queue updates") cls.is_running = True cls.evaluate_queue() + @classmethod + def evaluate_queue(cls): + try: + not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True) + for job in not_started: + if cls.is_available_for_job(job.renderer, job.priority): + cls.start_job(job) + + scheduled = cls.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True) + for job in scheduled: + if job.scheduled_start <= datetime.now(): + logger.debug(f"Starting scheduled job: {job}") + cls.start_job(job) + + if cls.last_saved_counts != cls.job_counts(): + cls.save_state() + except DetachedInstanceError: + pass + @classmethod def __local_job_status_changed(cls, job_id, old_status, new_status): render_job = RenderQueue.job_with_id(job_id, none_ok=True) @@ -54,20 +73,9 @@ class RenderQueue: cls.is_running = False # -------------------------------------------- - # Queue Management + # Fetch Jobs: # -------------------------------------------- - @classmethod - def add_to_render_queue(cls, render_job, force_start=False): - logger.info(f"Adding job to render queue: {render_job}") - cls.job_queue.append(render_job) - if cls.is_running and force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED): - cls.start_job(render_job) - cls.session.add(render_job) - cls.save_state() - if cls.is_running: - cls.evaluate_queue() - @classmethod def all_jobs(cls): return cls.job_queue @@ -97,12 +105,15 @@ class RenderQueue: return found_job @classmethod - def clear_history(cls): - to_remove = [x for x in cls.all_jobs() if x.status in [RenderStatus.CANCELLED, - RenderStatus.COMPLETED, RenderStatus.ERROR]] - for job_to_remove in to_remove: - cls.delete_job(job_to_remove) - cls.save_state() + def job_counts(cls): + job_counts = {} + for job_status in RenderStatus: + job_counts[job_status.value] = len(cls.jobs_with_status(job_status)) + return job_counts + + # -------------------------------------------- + # Startup / Shutdown: + # -------------------------------------------- @classmethod def load_state(cls, database_directory): @@ -127,6 +138,16 @@ class RenderQueue: cls.save_state() cls.session.close() + # -------------------------------------------- + # Renderer Availability: + # -------------------------------------------- + + @classmethod + def renderer_instances(cls): + from collections import Counter + all_instances = [x.renderer for x in cls.running_jobs()] + return Counter(all_instances) + @classmethod def is_available_for_job(cls, renderer, priority=2): @@ -136,24 +157,20 @@ class RenderQueue: maxed_out_instances = renderer in instances.keys() and instances[renderer] >= max_allowed_instances return not maxed_out_instances and not higher_priority_jobs + # -------------------------------------------- + # Job Lifecycle Management: + # -------------------------------------------- + @classmethod - def evaluate_queue(cls): - try: - not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True) - for job in not_started: - if cls.is_available_for_job(job.renderer, job.priority): - cls.start_job(job) - - scheduled = cls.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True) - for job in scheduled: - if job.scheduled_start <= datetime.now(): - logger.debug(f"Starting scheduled job: {job}") - cls.start_job(job) - - if cls.last_saved_counts != cls.job_counts(): - cls.save_state() - except DetachedInstanceError: - pass + def add_to_render_queue(cls, render_job, force_start=False): + logger.info(f"Adding job to render queue: {render_job}") + cls.job_queue.append(render_job) + if cls.is_running and force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED): + cls.start_job(render_job) + cls.session.add(render_job) + cls.save_state() + if cls.is_running: + cls.evaluate_queue() @classmethod def start_job(cls, job): @@ -176,15 +193,14 @@ class RenderQueue: cls.save_state() return True - @classmethod - def renderer_instances(cls): - from collections import Counter - all_instances = [x.renderer for x in cls.running_jobs()] - return Counter(all_instances) + # -------------------------------------------- + # Miscellaneous: + # -------------------------------------------- @classmethod - def job_counts(cls): - job_counts = {} - for job_status in RenderStatus: - job_counts[job_status.value] = len(cls.jobs_with_status(job_status)) - return job_counts + def clear_history(cls): + to_remove = [x for x in cls.all_jobs() if x.status in [RenderStatus.CANCELLED, + RenderStatus.COMPLETED, RenderStatus.ERROR]] + for job_to_remove in to_remove: + cls.delete_job(job_to_remove) + cls.save_state()