diff --git a/lib/render_queue.py b/lib/render_queue.py index a3a56ad..7c9009f 100755 --- a/lib/render_queue.py +++ b/lib/render_queue.py @@ -1,20 +1,17 @@ -import json import logging -import os import platform from datetime import datetime import psutil import requests +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker -from .scheduled_job import ScheduledJob from .render_workers.render_worker import RenderStatus +from .scheduled_job import ScheduledJob, Base logger = logging.getLogger() -JSON_FILE = 'server_state.json' -#todo: move history to sqlite db - class JobNotFoundError(Exception): def __init__(self, job_id, *args): @@ -23,6 +20,11 @@ class JobNotFoundError(Exception): class RenderQueue: + engine = create_engine('sqlite:///database.db') + Base.metadata.create_all(engine) + Session = sessionmaker(bind=engine) + session = Session() + ScheduledJob.register_user_events() job_queue = [] render_clients = [] maximum_renderer_instances = {'blender': 2, 'aerender': 1, 'ffmpeg': 4} @@ -40,15 +42,21 @@ class RenderQueue: def add_to_render_queue(cls, render_job, force_start=False, client=None): if not client or render_job.client == cls.host_name: - logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job.worker)) + logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job.worker_object)) render_job.client = cls.host_name cls.job_queue.append(render_job) if force_start: cls.start_job(render_job) + cls.session.add(render_job) + cls.save_state() else: # todo: implement client rendering logger.warning('remote client rendering not implemented yet') + @classmethod + def all_jobs(cls): + return cls.job_queue + @classmethod def running_jobs(cls): return cls.jobs_with_status(RenderStatus.RUNNING) @@ -61,89 +69,33 @@ class RenderQueue: @classmethod def jobs_with_status(cls, status, priority_sorted=False): - found_jobs = [x for x in cls.job_queue if x.render_status() == status] + found_jobs = [x for x in cls.all_jobs() if x.render_status() == status] if priority_sorted: found_jobs = sorted(found_jobs, key=lambda a: a.priority, reverse=False) return found_jobs @classmethod def job_with_id(cls, job_id, none_ok=False): - found_job = next((x for x in cls.job_queue if x.id == job_id), None) + found_job = next((x for x in cls.all_jobs() if x.id == job_id), None) if not found_job and not none_ok: raise JobNotFoundError(job_id) return found_job @classmethod def clear_history(cls): - to_remove = [x for x in cls.job_queue if x.render_status() in [RenderStatus.CANCELLED, - RenderStatus.COMPLETED, RenderStatus.ERROR]] + to_remove = [x for x in cls.all_jobs() if x.render_status() in [RenderStatus.CANCELLED, + RenderStatus.COMPLETED, RenderStatus.ERROR]] for job_to_remove in to_remove: - cls.job_queue.remove(job_to_remove) + cls.delete_job(job_to_remove) cls.save_state() @classmethod - def load_state(cls, json_path=None): - """Load state history from JSON file""" - - input_path = json_path or JSON_FILE - if os.path.exists(input_path): - with open(input_path) as f: - - # load saved data - saved_state = json.load(f) - cls.render_clients = saved_state.get('clients', {}) - - for job in saved_state.get('jobs', []): - try: - render_job = ScheduledJob(renderer=job['renderer'], input_path=job['worker']['input_path'], - output_path=job['worker']['output_path'], args=job['worker']['args'], - priority=job['priority'], client=job['client']) - - # Load Worker values - for key, val in job['worker'].items(): - if val and key in ['start_time', 'end_time']: # convert date strings back into date objects - render_job.worker.__dict__[key] = datetime.fromisoformat(val) - else: - render_job.worker.__dict__[key] = val - - render_job.worker.status = RenderStatus[job['status'].upper()] - job.pop('worker', None) - - # Create RenderJob with re-created Renderer object - for key, val in job.items(): - if key in ['date_created']: # convert date strings back to datetime objects - render_job.__dict__[key] = datetime.fromisoformat(val) - else: - import types - if hasattr(render_job, key): - if getattr(render_job, key) and not isinstance(getattr(render_job, key), types.MethodType): - render_job.__dict__[key] = val - - # Handle older loaded jobs that were cancelled before closing - if render_job.render_status() == RenderStatus.RUNNING: - render_job.worker.status = RenderStatus.CANCELLED - - # finally add back to render queue - cls.job_queue.append(render_job) - except Exception as e: - logger.exception(f"Unable to load job: {job['id']} - {e}") - - cls.last_saved_counts = cls.job_counts() + def load_state(cls): + cls.job_queue = cls.session.query(ScheduledJob).all() @classmethod - def save_state(cls, json_path=None): - """Save state history to JSON file""" - try: - logger.debug("Saving Render History") - output = {'timestamp': datetime.now().isoformat(), - 'jobs': [j.json() for j in cls.job_queue], - 'clients': cls.render_clients} - output_path = json_path or JSON_FILE - with open(output_path, 'w') as f: - json.dump(output, f, indent=4) - cls.last_saved_counts = cls.job_counts() - except Exception as e: - logger.error("Error saving state JSON: {}".format(e)) + def save_state(cls): + cls.session.commit() @classmethod def evaluate_queue(cls): @@ -153,10 +105,9 @@ class RenderQueue: not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True) if not_started: for job in not_started: - renderer = job.worker.engine.name() higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < job.priority] - max_renderers = renderer in instances.keys() and instances[ - renderer] >= cls.maximum_renderer_instances.get(renderer, 1) + max_renderers = job.renderer in instances.keys() and instances[ + job.renderer] >= cls.maximum_renderer_instances.get(job.renderer, 1) if not max_renderers and not higher_priority_jobs: cls.start_job(job) @@ -172,7 +123,7 @@ class RenderQueue: @classmethod def start_job(cls, job): logger.info('Starting {}render: {} - Priority {}'.format('scheduled ' if job.scheduled_start else '', job.name, - job.priority)) + job.priority)) job.start() @classmethod @@ -186,12 +137,14 @@ class RenderQueue: logger.info(f"Deleting job ID: {job.id}") job.stop() cls.job_queue.remove(job) + cls.session.delete(job) + cls.save_state() return True @classmethod def renderer_instances(cls): from collections import Counter - all_instances = [x.worker.engine.name() for x in cls.running_jobs()] + all_instances = [x.renderer for x in cls.running_jobs()] return Counter(all_instances) @classmethod @@ -199,25 +152,21 @@ class RenderQueue: job_counts = {} for job_status in RenderStatus: job_counts[job_status.value] = len(cls.jobs_with_status(job_status)) - return job_counts @classmethod def status(cls): - - stats = {"timestamp": datetime.now().isoformat(), - "platform": platform.platform(), - "cpu_percent": psutil.cpu_percent(percpu=False), - "cpu_percent_per_cpu": psutil.cpu_percent(percpu=True), - "cpu_count": psutil.cpu_count(), - "memory_total": psutil.virtual_memory().total, - "memory_available": psutil.virtual_memory().available, - "memory_percent": psutil.virtual_memory().percent, - "job_counts": cls.job_counts(), - "host_name": cls.host_name - } - - return stats + return {"timestamp": datetime.now().isoformat(), + "platform": platform.platform(), + "cpu_percent": psutil.cpu_percent(percpu=False), + "cpu_percent_per_cpu": psutil.cpu_percent(percpu=True), + "cpu_count": psutil.cpu_count(), + "memory_total": psutil.virtual_memory().total, + "memory_available": psutil.virtual_memory().available, + "memory_percent": psutil.virtual_memory().percent, + "job_counts": cls.job_counts(), + "host_name": cls.host_name + } @classmethod def register_client(cls, hostname): @@ -264,4 +213,3 @@ class RenderQueue: except requests.ConnectionError as e: pass return False - diff --git a/lib/render_workers/blender_worker.py b/lib/render_workers/blender_worker.py index 83abb40..eb99666 100644 --- a/lib/render_workers/blender_worker.py +++ b/lib/render_workers/blender_worker.py @@ -27,12 +27,6 @@ class Blender(BaseRenderEngine): formats = re.findall(r"'([A-Z_0-9]+)'", format_string) return formats - @classmethod - def full_report(cls): - return {'version': cls.version(), - 'help_text': cls.get_help(), - 'formats': cls.get_formats()} - @classmethod def run_python_expression(cls, project_path, python_expression): if os.path.exists(project_path): diff --git a/lib/render_workers/render_worker.py b/lib/render_workers/render_worker.py index 0012f3e..e6a0a77 100644 --- a/lib/render_workers/render_worker.py +++ b/lib/render_workers/render_worker.py @@ -4,6 +4,7 @@ import logging import os import subprocess import threading +import json from datetime import datetime from enum import Enum @@ -19,6 +20,7 @@ class RenderStatus(Enum): CANCELLED = "cancelled" ERROR = "error" SCHEDULED = "scheduled" + UNDEFINED = "undefined" def string_to_status(string): @@ -135,7 +137,7 @@ class BaseRenderWorker(object): while self.failed_attempts < self.maximum_attempts and self.status is not RenderStatus.COMPLETED: if self.failed_attempts: - logger.info('Attempt #{} failed. Starting attempt #{}'.format(self.failed_attempts, self.failed_attempts + 1)) + logger.info(f'Attempt #{self.failed_attempts} failed. Starting attempt #{self.failed_attempts + 1}') # Start process and get updates subprocess_cmds = self.generate_subprocess() @@ -208,14 +210,57 @@ class BaseRenderWorker(object): def _parse_stdout(self, line): raise NotImplementedError("_parse_stdout not implemented") - def elapsed_time(self): - elapsed = "" - if self.start_time: - if self.end_time: - elapsed = self.end_time - self.start_time - elif self.is_running(): - elapsed = datetime.now() - self.start_time - return elapsed + def time_elapsed(self): + + from string import Template + + class DeltaTemplate(Template): + delimiter = "%" + + def strfdelta(tdelta, fmt='%H:%M:%S'): + d = {"D": tdelta.days} + hours, rem = divmod(tdelta.seconds, 3600) + minutes, seconds = divmod(rem, 60) + d["H"] = '{:02d}'.format(hours) + d["M"] = '{:02d}'.format(minutes) + d["S"] = '{:02d}'.format(seconds) + t = DeltaTemplate(fmt) + return t.substitute(**d) + + # calculate elapsed time + elapsed_time = None + start_time = self.start_time + end_time = self.end_time + + if start_time: + if end_time: + elapsed_time = end_time - start_time + elif self.status == RenderStatus.RUNNING: + elapsed_time = datetime.now() - start_time + + elapsed_time_string = strfdelta(elapsed_time) if elapsed_time else "Unknown" + return elapsed_time_string + + def json(self): + worker_data = self.__dict__.copy() + worker_data['percent_complete'] = self.percent_complete() + worker_data['time_elapsed'] = self.time_elapsed() + worker_data['status'] = self.status.value + keys_to_remove = ['thread', 'process'] # remove unwanted keys from dict + for key in worker_data.keys(): + if key.startswith('_'): + keys_to_remove.append(key) + for key in keys_to_remove: + worker_data.pop(key, None) + + # convert to json and back to auto-convert dates to iso format + def date_serializer(o): + if isinstance(o, datetime): + return o.isoformat() + + json_convert = json.dumps(worker_data, default=date_serializer) + worker_json = json.loads(json_convert) + return worker_json class BaseRenderEngine(object): diff --git a/lib/scheduled_job.py b/lib/scheduled_job.py index 42171c5..e1b9858 100644 --- a/lib/scheduled_job.py +++ b/lib/scheduled_job.py @@ -3,72 +3,116 @@ import hashlib import json import logging import os -import threading import uuid from datetime import datetime +from sqlalchemy import Column, Integer, String, DateTime, JSON, event +from sqlalchemy.ext.declarative import declarative_base + from .render_workers.render_worker import RenderStatus, RenderWorkerFactory logger = logging.getLogger() +Base = declarative_base() -class ScheduledJob: +class ScheduledJob(Base): + __tablename__ = 'scheduled_jobs' + id = Column(String, primary_key=True) - # Get file hash on bg thread - def __get_file_hash(self): - if os.path.exists(self.worker.input_path): - self.file_hash = hashlib.md5(open(self.worker.input_path, 'rb').read()).hexdigest() + renderer = Column(String) + input_path = Column(String) + output_path = Column(String) + priority = Column(Integer) + owner = Column(String) + client = Column(String) + notify = Column(String) + date_created = Column(DateTime) + scheduled_start = Column(DateTime) + name = Column(String) + file_hash = Column(String) + worker_data_store = Column(JSON) def __init__(self, renderer, input_path, output_path, args, priority=2, owner=None, client=None, notify=None, custom_id=None, name=None): self.id = custom_id or self.generate_id() - self.owner = owner + + self.renderer = renderer + self.input_path = input_path + self.output_path = output_path self.priority = priority + self.owner = owner self.client = client self.notify = notify self.date_created = datetime.now() self.scheduled_start = None - self.renderer = renderer self.name = name or os.path.basename(input_path) + '_' + self.date_created.strftime("%Y.%m.%d_%H.%M.%S") - self.worker = RenderWorkerFactory.create_worker(renderer, input_path, output_path, args) - self.worker.log_path = os.path.join(os.path.dirname(input_path), self.name + '.log') - self.worker.validate() + self.worker_object = RenderWorkerFactory.create_worker(renderer, input_path, output_path, args) + self.worker_object.log_path = self.log_path() + self.worker_object.validate() self.file_hash = None - threading.Thread(target=self.__get_file_hash).start() # get file hash on bg thread + if not self.file_hash and os.path.exists(input_path): + self.file_hash = hashlib.md5(open(input_path, 'rb').read()).hexdigest() + + def worker_data(self): + if hasattr(self, 'worker_object'): + fetched = self.worker_object.json() + if fetched != self.worker_data_store: + self.worker_data_store = fetched + return self.worker_data_store + + @staticmethod + def before_insert(mapper, connection, target): + logger.debug(f"Before insert: {target.name}") + if hasattr(target, 'worker_object'): + target.worker_data_store = target.worker_object.json() + + @staticmethod + def before_update(mapper, connection, target): + logger.debug(f"Before update: {target.name}") + if hasattr(target, 'worker_object'): + target.worker_data_store = target.worker_object.json() + + @classmethod + def register_user_events(cls): + event.listen(cls, 'before_insert', cls.before_insert) + event.listen(cls, 'before_update', cls.before_update) def render_status(self): - if self.scheduled_start and self.worker.status == RenderStatus.NOT_STARTED: - return RenderStatus.SCHEDULED - else: - return self.worker.status - - def file_hash(self): - if os.path.exists(self.worker.input_path): - return hashlib.md5(open(self.worker.input_path, 'rb').read()).hexdigest() - return None + try: + worker_status = RenderStatus(self.worker_data()['status']) + if self.scheduled_start and worker_status == RenderStatus.NOT_STARTED: + return RenderStatus.SCHEDULED + else: + return worker_status + except Exception as e: + logger.error(f"Exception fetching render status: {e}") + return RenderStatus.UNDEFINED def json(self): """Converts RenderJob into JSON-friendly dict""" job_dict = None try: - job_dict = self.__dict__.copy() - job_dict['status'] = self.render_status().value - job_dict['time_elapsed'] = self.time_elapsed() if type(self.time_elapsed) != str else self.time_elapsed - job_dict['file_hash'] = self.file_hash - job_dict['percent_complete'] = self.percent_complete() - job_dict['file_list'] = self.file_list() - job_dict['worker'] = self.worker.__dict__.copy() - job_dict['worker']['status'] = job_dict['status'] - - # remove unwanted keys from dict - keys_to_remove = ['thread', 'process'] - for key in job_dict['worker'].keys(): - if key.startswith('_'): - keys_to_remove.append(key) - for key in keys_to_remove: - job_dict['worker'].pop(key, None) + job_dict = { + 'id': self.id, + 'name': self.name, + 'input_path': self.input_path, + 'output_path': self.output_path, + 'priority': self.priority, + 'owner': self.owner, + 'client': self.client, + 'notify': self.notify, + 'date_created': self.date_created, + 'scheduled_start': self.scheduled_start, + 'status': self.render_status().value, + 'time_elapsed': self.worker_data().get('time_elapsed', None), + 'file_hash': self.file_hash, + 'percent_complete': self.worker_data().get('percent_complete', None), + 'file_list': self.file_list(), + 'renderer': self.renderer, + 'worker': self.worker_data(), + } # convert to json and back to auto-convert dates to iso format def date_serializer(o): @@ -82,58 +126,23 @@ class ScheduledJob: return job_dict def start(self): - self.worker.start() + if hasattr(self, 'worker_object'): + self.worker_object.start() def stop(self): - self.worker.stop() - - def time_elapsed(self): - - from string import Template - - class DeltaTemplate(Template): - delimiter = "%" - - def strfdelta(tdelta, fmt='%H:%M:%S'): - d = {"D": tdelta.days} - hours, rem = divmod(tdelta.seconds, 3600) - minutes, seconds = divmod(rem, 60) - d["H"] = '{:02d}'.format(hours) - d["M"] = '{:02d}'.format(minutes) - d["S"] = '{:02d}'.format(seconds) - t = DeltaTemplate(fmt) - return t.substitute(**d) - - # calculate elapsed time - elapsed_time = None - start_time = self.worker.start_time - end_time = self.worker.end_time - - if start_time: - if end_time: - elapsed_time = end_time - start_time - elif self.render_status() == RenderStatus.RUNNING: - elapsed_time = datetime.now() - start_time - - elapsed_time_string = strfdelta(elapsed_time) if elapsed_time else "Unknown" - return elapsed_time_string - - def frame_count(self): - return self.worker.total_frames + if hasattr(self, 'worker_object'): + self.worker_object.stop() def work_path(self): - return os.path.dirname(self.worker.output_path) + return os.path.dirname(self.output_path) def file_list(self): - job_dir = os.path.dirname(self.worker.output_path) + job_dir = os.path.dirname(self.output_path) return glob.glob(os.path.join(job_dir, '*')) def log_path(self): - return self.worker.log_path - - def percent_complete(self): - return self.worker.percent_complete() + return os.path.join(os.path.dirname(self.input_path), self.name + '.log') @classmethod def generate_id(cls): - return str(uuid.uuid4()).split('-')[0] \ No newline at end of file + return str(uuid.uuid4()).split('-')[0] diff --git a/lib/server/job_server.py b/lib/server/job_server.py index c8d04f0..d6cf488 100755 --- a/lib/server/job_server.py +++ b/lib/server/job_server.py @@ -49,7 +49,7 @@ def index(): with open('config/presets.yaml') as f: presets = yaml.load(f, Loader=yaml.FullLoader) - return render_template('index.html', all_jobs=sorted_jobs(RenderQueue.job_queue), + return render_template('index.html', all_jobs=sorted_jobs(RenderQueue.all_jobs()), hostname=RenderQueue.host_name, renderer_info=renderer_info(), render_clients=RenderQueue.render_clients, preset_list=presets) @@ -101,7 +101,7 @@ def get_job_file(job_id, filename): @server.get('/api/jobs') def jobs_json(): - return [x.json() for x in RenderQueue.job_queue] + return [x.json() for x in RenderQueue.all_jobs()] @server.get('/api/jobs/') @@ -127,7 +127,7 @@ def get_job_status(job_id): @server.get('/api/job//logs') def get_job_logs(job_id): found_job = RenderQueue.job_with_id(job_id) - log_path = found_job.worker.log_path + log_path = found_job.log_path() log_data = None if log_path and os.path.exists(log_path): with open(log_path) as file: @@ -151,9 +151,9 @@ def download_all(job_id): return response found_job = RenderQueue.job_with_id(job_id) - output_dir = os.path.dirname(found_job.worker.output_path) + output_dir = os.path.dirname(found_job.output_path) if os.path.exists(output_dir): - zip_filename = os.path.join('/tmp', pathlib.Path(found_job.worker.input_path).stem + '.zip') + zip_filename = os.path.join('/tmp', pathlib.Path(found_job.input_path).stem + '.zip') with ZipFile(zip_filename, 'w') as zipObj: for f in os.listdir(output_dir): zipObj.write(filename=os.path.join(output_dir, f), @@ -219,7 +219,7 @@ def full_status(): @server.get('/api/snapshot') def snapshot(): server_status = RenderQueue.status() - server_jobs = [x.json() for x in RenderQueue.job_queue] + server_jobs = [x.json() for x in RenderQueue.all_jobs()] server_data = {'status': server_status, 'jobs': server_jobs, 'timestamp': datetime.now().isoformat()} return server_data @@ -405,7 +405,7 @@ def delete_job(job_id): os.remove(thumb_path) # See if we own the input file (i.e. was it uploaded) - input_dir = os.path.dirname(found_job.worker.input_path) + input_dir = os.path.dirname(found_job.input_path) if server.config['UPLOAD_FOLDER'] in input_dir and os.path.exists(input_dir): shutil.rmtree(input_dir) diff --git a/lib/server/static/js/job_table.js b/lib/server/static/js/job_table.js index ba2d63c..d99039f 100644 --- a/lib/server/static/js/job_table.js +++ b/lib/server/static/js/job_table.js @@ -10,7 +10,7 @@ columns: [ data: (row) => row.name, formatter: (name, row) => gridjs.html(`${name}`) }, - { id: 'renderer', data: (row) => `${row.renderer}-${row.worker.renderer_version}`, name: 'Renderer' }, + { id: 'renderer', data: (row) => `${row.renderer}-${row.worker?.renderer_version}`, name: 'Renderer' }, { id: 'priority', name: 'Priority' }, { id: 'status', name: 'Status', @@ -21,9 +21,9 @@ columns: [ value="${(parseFloat(cell.percent_complete) * 100.0)}" max="100">${cell.status} `)}, { id: 'time_elapsed', name: 'Time Elapsed' }, - { data: (row) => row.worker.total_frames, name: 'Frame Count' }, + { data: (row) => row.worker?.total_frames ?? 'N/A', name: 'Frame Count' }, { id: 'client', name: 'Client'}, - { data: (row) => row.worker.last_output, + { data: (row) => row.worker?.last_output ?? 'N/A', name: 'Last Output', formatter: (output, row) => gridjs.html(`${output}`) }, diff --git a/lib/utilities/server_helper.py b/lib/utilities/server_helper.py index 1929278..d91d725 100644 --- a/lib/utilities/server_helper.py +++ b/lib/utilities/server_helper.py @@ -33,7 +33,7 @@ def generate_thumbnail_for_job(job, thumb_video_path, thumb_image_path, max_widt elif len(job.file_list()) > 1: # if image sequence, use second to last file (last may be in use) source_path = [job.file_list()[-2]] else: - source_path = [job.worker.input_path] # use source if nothing else + source_path = [job.input_path] # use source if nothing else if source_path: # Todo: convert image sequence to animated movie diff --git a/requirements.txt b/requirements.txt index 6862f32..c751d33 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,11 +1,12 @@ click~=8.1.3 -requests==2.28.1 -psutil~=5.9.0 +requests==2.31.0 +psutil==5.9.5 PyYAML~=6.0 -Flask==2.2.2 -rich==12.6.0 +Flask==2.3.2 +rich==13.3.5 ffmpeg-python -Werkzeug~=2.2.2 +Werkzeug==2.3.4 tkinterdnd2~=0.3.0 -future~=0.18.2 -json2html~=1.3.0 \ No newline at end of file +future==0.18.3 +json2html~=1.3.0 +SQLAlchemy~=2.0.15 \ No newline at end of file