From e11c5e7e58d071b698c63090f56d9a9287b08e70 Mon Sep 17 00:00:00 2001 From: Brett Williams Date: Wed, 24 May 2023 09:58:02 -0500 Subject: [PATCH] Convert render_queue and scheduledjob to use sql instead of json --- lib/render_queue.py | 127 +++++++++------------------------ lib/scheduled_job.py | 85 +++++++++------------- lib/server/job_server.py | 14 ++-- lib/utilities/server_helper.py | 2 +- 4 files changed, 76 insertions(+), 152 deletions(-) diff --git a/lib/render_queue.py b/lib/render_queue.py index 4212228..6db6708 100755 --- a/lib/render_queue.py +++ b/lib/render_queue.py @@ -1,22 +1,17 @@ -import json import logging -import os import platform from datetime import datetime -from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker import psutil import requests +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker -from .scheduled_job import ScheduledJob, Base from .render_workers.render_worker import RenderStatus +from .scheduled_job import ScheduledJob, Base logger = logging.getLogger() -JSON_FILE = 'server_state.json' -#todo: move history to sqlite db - class JobNotFoundError(Exception): def __init__(self, job_id, *args): @@ -25,7 +20,6 @@ class JobNotFoundError(Exception): class RenderQueue: - engine = create_engine('sqlite:///database.db') Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) @@ -47,17 +41,21 @@ class RenderQueue: def add_to_render_queue(cls, render_job, force_start=False, client=None): if not client or render_job.client == cls.host_name: - logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job.worker)) + logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job.worker())) render_job.client = cls.host_name cls.job_queue.append(render_job) if force_start: cls.start_job(render_job) cls.session.add(render_job) - cls.session.commit() + cls.save_state() else: # todo: implement client rendering logger.warning('remote client rendering not implemented yet') + @classmethod + def all_jobs(cls): + return cls.job_queue + @classmethod def running_jobs(cls): return cls.jobs_with_status(RenderStatus.RUNNING) @@ -70,89 +68,33 @@ class RenderQueue: @classmethod def jobs_with_status(cls, status, priority_sorted=False): - found_jobs = [x for x in cls.job_queue if x.render_status() == status] + found_jobs = [x for x in cls.all_jobs() if x.render_status() == status] if priority_sorted: found_jobs = sorted(found_jobs, key=lambda a: a.priority, reverse=False) return found_jobs @classmethod def job_with_id(cls, job_id, none_ok=False): - found_job = next((x for x in cls.job_queue if x.id == job_id), None) + found_job = next((x for x in cls.all_jobs() if x.id == job_id), None) if not found_job and not none_ok: raise JobNotFoundError(job_id) return found_job @classmethod def clear_history(cls): - to_remove = [x for x in cls.job_queue if x.render_status() in [RenderStatus.CANCELLED, - RenderStatus.COMPLETED, RenderStatus.ERROR]] + to_remove = [x for x in cls.all_jobs() if x.render_status() in [RenderStatus.CANCELLED, + RenderStatus.COMPLETED, RenderStatus.ERROR]] for job_to_remove in to_remove: cls.job_queue.remove(job_to_remove) cls.save_state() @classmethod - def load_state(cls, json_path=None): - """Load state history from JSON file""" - - input_path = json_path or JSON_FILE - if os.path.exists(input_path): - with open(input_path) as f: - - # load saved data - saved_state = json.load(f) - cls.render_clients = saved_state.get('clients', {}) - - for job in saved_state.get('jobs', []): - try: - render_job = ScheduledJob(renderer=job['renderer'], input_path=job['worker']['input_path'], - output_path=job['worker']['output_path'], args=job['worker']['args'], - priority=job['priority'], client=job['client']) - - # Load Worker values - for key, val in job['worker'].items(): - if val and key in ['start_time', 'end_time']: # convert date strings back into date objects - render_job.worker.__dict__[key] = datetime.fromisoformat(val) - else: - render_job.worker.__dict__[key] = val - - render_job.worker.status = RenderStatus[job['status'].upper()] - job.pop('worker', None) - - # Create RenderJob with re-created Renderer object - for key, val in job.items(): - if key in ['date_created']: # convert date strings back to datetime objects - render_job.__dict__[key] = datetime.fromisoformat(val) - else: - import types - if hasattr(render_job, key): - if getattr(render_job, key) and not isinstance(getattr(render_job, key), types.MethodType): - render_job.__dict__[key] = val - - # Handle older loaded jobs that were cancelled before closing - if render_job.render_status() == RenderStatus.RUNNING: - render_job.worker.status = RenderStatus.CANCELLED - - # finally add back to render queue - cls.job_queue.append(render_job) - except Exception as e: - logger.exception(f"Unable to load job: {job['id']} - {e}") - - cls.last_saved_counts = cls.job_counts() + def load_state(cls): + cls.job_queue = cls.session.query(ScheduledJob).all() @classmethod - def save_state(cls, json_path=None): - """Save state history to JSON file""" - try: - logger.debug("Saving Render History") - output = {'timestamp': datetime.now().isoformat(), - 'jobs': [j.json() for j in cls.job_queue], - 'clients': cls.render_clients} - output_path = json_path or JSON_FILE - with open(output_path, 'w') as f: - json.dump(output, f, indent=4) - cls.last_saved_counts = cls.job_counts() - except Exception as e: - logger.error("Error saving state JSON: {}".format(e)) + def save_state(cls): + cls.session.commit() @classmethod def evaluate_queue(cls): @@ -162,7 +104,7 @@ class RenderQueue: not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True) if not_started: for job in not_started: - renderer = job.worker.engine.name() + renderer = job.worker().engine.name() higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < job.priority] max_renderers = renderer in instances.keys() and instances[ renderer] >= cls.maximum_renderer_instances.get(renderer, 1) @@ -181,7 +123,7 @@ class RenderQueue: @classmethod def start_job(cls, job): logger.info('Starting {}render: {} - Priority {}'.format('scheduled ' if job.scheduled_start else '', job.name, - job.priority)) + job.priority)) job.start() @classmethod @@ -195,12 +137,14 @@ class RenderQueue: logger.info(f"Deleting job ID: {job.id}") job.stop() cls.job_queue.remove(job) + cls.session.delete(job) + cls.save_state() return True @classmethod def renderer_instances(cls): from collections import Counter - all_instances = [x.worker.engine.name() for x in cls.running_jobs()] + all_instances = [x.worker().engine.name() for x in cls.running_jobs()] return Counter(all_instances) @classmethod @@ -208,25 +152,21 @@ class RenderQueue: job_counts = {} for job_status in RenderStatus: job_counts[job_status.value] = len(cls.jobs_with_status(job_status)) - return job_counts @classmethod def status(cls): - - stats = {"timestamp": datetime.now().isoformat(), - "platform": platform.platform(), - "cpu_percent": psutil.cpu_percent(percpu=False), - "cpu_percent_per_cpu": psutil.cpu_percent(percpu=True), - "cpu_count": psutil.cpu_count(), - "memory_total": psutil.virtual_memory().total, - "memory_available": psutil.virtual_memory().available, - "memory_percent": psutil.virtual_memory().percent, - "job_counts": cls.job_counts(), - "host_name": cls.host_name - } - - return stats + return {"timestamp": datetime.now().isoformat(), + "platform": platform.platform(), + "cpu_percent": psutil.cpu_percent(percpu=False), + "cpu_percent_per_cpu": psutil.cpu_percent(percpu=True), + "cpu_count": psutil.cpu_count(), + "memory_total": psutil.virtual_memory().total, + "memory_available": psutil.virtual_memory().available, + "memory_percent": psutil.virtual_memory().percent, + "job_counts": cls.job_counts(), + "host_name": cls.host_name + } @classmethod def register_client(cls, hostname): @@ -273,4 +213,3 @@ class RenderQueue: except requests.ConnectionError as e: pass return False - diff --git a/lib/scheduled_job.py b/lib/scheduled_job.py index 49a9859..6139605 100644 --- a/lib/scheduled_job.py +++ b/lib/scheduled_job.py @@ -3,7 +3,6 @@ import hashlib import json import logging import os -import threading import uuid from datetime import datetime @@ -20,10 +19,17 @@ class ScheduledJob(Base): __tablename__ = 'scheduled_jobs' id = Column(String, primary_key=True) - # Get file hash on bg thread - def __get_file_hash(self): - if os.path.exists(self.worker.input_path): - self.file_hash = hashlib.md5(open(self.worker.input_path, 'rb').read()).hexdigest() + renderer = Column(String) + input_path = Column(String) + output_path = Column(String) + priority = Column(Integer) + owner = Column(String) + client = Column(String) + notify = Column(String) + date_created = Column(DateTime) + scheduled_start = Column(DateTime) + name = Column(String) + file_hash = Column(String) def __init__(self, renderer, input_path, output_path, args, priority=2, owner=None, client=None, notify=None, custom_id=None, name=None): @@ -45,18 +51,23 @@ class ScheduledJob(Base): self.worker_object.validate() self.file_hash = None - threading.Thread(target=self.__get_file_hash).start() # get file hash on bg thread + if not self.file_hash and os.path.exists(input_path): + self.file_hash = hashlib.md5(open(input_path, 'rb').read()).hexdigest() + + def worker(self): + if hasattr(self, 'worker_object'): + return self.worker_object + else: + return {} def render_status(self): - if self.scheduled_start and self.worker.status == RenderStatus.NOT_STARTED: - return RenderStatus.SCHEDULED - else: - return self.worker.status - - def file_hash(self): - if os.path.exists(self.worker.input_path): - return hashlib.md5(open(self.worker.input_path, 'rb').read()).hexdigest() - return None + try: + if self.scheduled_start and self.worker().status == RenderStatus.NOT_STARTED: + return RenderStatus.SCHEDULED + else: + return self.worker().status + except: + return RenderStatus.CANCELLED def json(self): """Converts RenderJob into JSON-friendly dict""" @@ -94,44 +105,15 @@ class ScheduledJob(Base): return job_dict def start(self): - self.worker.start() + if hasattr(self, 'worker'): + self.worker().start() def stop(self): - self.worker.stop() - - def time_elapsed(self): - - from string import Template - - class DeltaTemplate(Template): - delimiter = "%" - - def strfdelta(tdelta, fmt='%H:%M:%S'): - d = {"D": tdelta.days} - hours, rem = divmod(tdelta.seconds, 3600) - minutes, seconds = divmod(rem, 60) - d["H"] = '{:02d}'.format(hours) - d["M"] = '{:02d}'.format(minutes) - d["S"] = '{:02d}'.format(seconds) - t = DeltaTemplate(fmt) - return t.substitute(**d) - - # calculate elapsed time - elapsed_time = None - start_time = self.worker.start_time - end_time = self.worker.end_time - - if start_time: - if end_time: - elapsed_time = end_time - start_time - elif self.render_status() == RenderStatus.RUNNING: - elapsed_time = datetime.now() - start_time - - elapsed_time_string = strfdelta(elapsed_time) if elapsed_time else "Unknown" - return elapsed_time_string + if hasattr(self, 'worker'): + self.worker().stop() def frame_count(self): - return self.worker.total_frames + return self.worker().total_frames def work_path(self): return os.path.dirname(self.output_path) @@ -144,7 +126,10 @@ class ScheduledJob(Base): return os.path.join(os.path.dirname(self.input_path), self.name + '.log') def percent_complete(self): - return self.worker.percent_complete() + try: + return self.worker().percent_complete() + except: + return -1 @classmethod def generate_id(cls): diff --git a/lib/server/job_server.py b/lib/server/job_server.py index c8d04f0..d6cf488 100755 --- a/lib/server/job_server.py +++ b/lib/server/job_server.py @@ -49,7 +49,7 @@ def index(): with open('config/presets.yaml') as f: presets = yaml.load(f, Loader=yaml.FullLoader) - return render_template('index.html', all_jobs=sorted_jobs(RenderQueue.job_queue), + return render_template('index.html', all_jobs=sorted_jobs(RenderQueue.all_jobs()), hostname=RenderQueue.host_name, renderer_info=renderer_info(), render_clients=RenderQueue.render_clients, preset_list=presets) @@ -101,7 +101,7 @@ def get_job_file(job_id, filename): @server.get('/api/jobs') def jobs_json(): - return [x.json() for x in RenderQueue.job_queue] + return [x.json() for x in RenderQueue.all_jobs()] @server.get('/api/jobs/') @@ -127,7 +127,7 @@ def get_job_status(job_id): @server.get('/api/job//logs') def get_job_logs(job_id): found_job = RenderQueue.job_with_id(job_id) - log_path = found_job.worker.log_path + log_path = found_job.log_path() log_data = None if log_path and os.path.exists(log_path): with open(log_path) as file: @@ -151,9 +151,9 @@ def download_all(job_id): return response found_job = RenderQueue.job_with_id(job_id) - output_dir = os.path.dirname(found_job.worker.output_path) + output_dir = os.path.dirname(found_job.output_path) if os.path.exists(output_dir): - zip_filename = os.path.join('/tmp', pathlib.Path(found_job.worker.input_path).stem + '.zip') + zip_filename = os.path.join('/tmp', pathlib.Path(found_job.input_path).stem + '.zip') with ZipFile(zip_filename, 'w') as zipObj: for f in os.listdir(output_dir): zipObj.write(filename=os.path.join(output_dir, f), @@ -219,7 +219,7 @@ def full_status(): @server.get('/api/snapshot') def snapshot(): server_status = RenderQueue.status() - server_jobs = [x.json() for x in RenderQueue.job_queue] + server_jobs = [x.json() for x in RenderQueue.all_jobs()] server_data = {'status': server_status, 'jobs': server_jobs, 'timestamp': datetime.now().isoformat()} return server_data @@ -405,7 +405,7 @@ def delete_job(job_id): os.remove(thumb_path) # See if we own the input file (i.e. was it uploaded) - input_dir = os.path.dirname(found_job.worker.input_path) + input_dir = os.path.dirname(found_job.input_path) if server.config['UPLOAD_FOLDER'] in input_dir and os.path.exists(input_dir): shutil.rmtree(input_dir) diff --git a/lib/utilities/server_helper.py b/lib/utilities/server_helper.py index 1929278..d91d725 100644 --- a/lib/utilities/server_helper.py +++ b/lib/utilities/server_helper.py @@ -33,7 +33,7 @@ def generate_thumbnail_for_job(job, thumb_video_path, thumb_image_path, max_widt elif len(job.file_list()) > 1: # if image sequence, use second to last file (last may be in use) source_path = [job.file_list()[-2]] else: - source_path = [job.worker.input_path] # use source if nothing else + source_path = [job.input_path] # use source if nothing else if source_path: # Todo: convert image sequence to animated movie