diff --git a/lib/render_queue.py b/lib/render_queue.py index 5be9dd6..ebcba6b 100755 --- a/lib/render_queue.py +++ b/lib/render_queue.py @@ -7,8 +7,7 @@ import requests from sqlalchemy import create_engine, Column, String, Integer from sqlalchemy.orm import sessionmaker -from .render_workers.base_worker import RenderStatus -from .scheduled_job import ScheduledJob, Base +from .render_workers.base_worker import RenderStatus, BaseRenderWorker, Base logger = logging.getLogger() @@ -45,7 +44,6 @@ class RenderQueue: Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) session = Session() - ScheduledJob.register_user_events() job_queue = [] maximum_renderer_instances = {'blender': 1, 'aerender': 1, 'ffmpeg': 4} hostname = None @@ -62,7 +60,7 @@ class RenderQueue: def add_to_render_queue(cls, render_job, force_start=False, client=None): if not client or render_job.client == cls.hostname: - logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job.worker_object)) + logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job)) render_job.client = cls.hostname cls.job_queue.append(render_job) if force_start: @@ -89,7 +87,7 @@ class RenderQueue: @classmethod def jobs_with_status(cls, status, priority_sorted=False): - found_jobs = [x for x in cls.all_jobs() if x.render_status() == status] + found_jobs = [x for x in cls.all_jobs() if x.status == status] if priority_sorted: found_jobs = sorted(found_jobs, key=lambda a: a.priority, reverse=False) return found_jobs @@ -103,7 +101,7 @@ class RenderQueue: @classmethod def clear_history(cls): - to_remove = [x for x in cls.all_jobs() if x.render_status() in [RenderStatus.CANCELLED, + to_remove = [x for x in cls.all_jobs() if x.status in [RenderStatus.CANCELLED, RenderStatus.COMPLETED, RenderStatus.ERROR]] for job_to_remove in to_remove: cls.delete_job(job_to_remove) @@ -111,7 +109,7 @@ class RenderQueue: @classmethod def load_state(cls): - cls.job_queue = cls.session.query(ScheduledJob).all() + cls.job_queue = cls.session.query(BaseRenderWorker).all() @classmethod def save_state(cls): @@ -141,15 +139,14 @@ class RenderQueue: @classmethod def start_job(cls, job): - logger.info('Starting {}render: {} - Priority {}'.format('scheduled ' if job.scheduled_start else '', job.name, - job.priority)) + logger.info(f'Starting render: {job.name} - Priority {job.priority}') job.start() @classmethod def cancel_job(cls, job): logger.info(f'Cancelling job ID: {job.id}') job.stop() - return job.render_status() == RenderStatus.CANCELLED + return job.status == RenderStatus.CANCELLED @classmethod def delete_job(cls, job): diff --git a/lib/render_workers/aerender_worker.py b/lib/render_workers/aerender_worker.py index 5a47a8e..48ba2ae 100644 --- a/lib/render_workers/aerender_worker.py +++ b/lib/render_workers/aerender_worker.py @@ -22,9 +22,10 @@ class AERenderWorker(BaseRenderWorker): supported_extensions = ['.aep'] engine = AERender - def __init__(self, input_path, output_path, args=None): - super(AERenderWorker, self).__init__(input_path=input_path, output_path=output_path, ignore_extensions=False, - args=args) + def __init__(self, input_path, output_path, priority=2, args=None, owner=None, + client=None, name=None): + super(AERenderWorker, self).__init__(input_path=input_path, output_path=output_path, args=args, + client=client, priority=priority, owner=owner, name=name) self.comp = args.get('comp', None) self.render_settings = args.get('render_settings', None) diff --git a/lib/render_workers/base_worker.py b/lib/render_workers/base_worker.py index bd9f34f..ef4bf9f 100644 --- a/lib/render_workers/base_worker.py +++ b/lib/render_workers/base_worker.py @@ -5,12 +5,17 @@ import os import subprocess import threading import json +import glob from datetime import datetime from enum import Enum +from sqlalchemy import Column, Integer, String, DateTime, JSON, event +from sqlalchemy.ext.declarative import declarative_base + import psutil logger = logging.getLogger() +Base = declarative_base() class RenderStatus(Enum): @@ -27,14 +32,32 @@ def string_to_status(string): for stat in RenderStatus: if stat.value == string: return stat - return RenderStatus.ERROR + return RenderStatus.UNDEFINED -class BaseRenderWorker(object): +class BaseRenderWorker(Base): + __tablename__ = 'render_workers' + + id = Column(String, primary_key=True) + input_path = Column(String) + output_path = Column(String) + date_created = Column(DateTime) + start_time = Column(DateTime) + end_time = Column(DateTime) + renderer = Column(String) + renderer_version = Column(String) + priority = Column(Integer) + total_frames = Column(Integer) + owner = Column(String) + client = Column(String) + name = Column(String) + file_hash = Column(String) + _status = Column(String) engine = None - def __init__(self, input_path, output_path, args=None, ignore_extensions=True): + def __init__(self, input_path, output_path, priority=2, args=None, ignore_extensions=True, owner=None, client=None, + name=None): if not ignore_extensions: if not any(ext in input_path for ext in self.engine.supported_extensions): @@ -44,19 +67,28 @@ class BaseRenderWorker(object): if not self.engine: raise NotImplementedError("Engine not defined") + def generate_id(): + import uuid + return str(uuid.uuid4()).split('-')[0] + # Essential Info + self.id = generate_id() self.input_path = input_path self.output_path = output_path self.args = args or {} self.date_created = datetime.now() + self.renderer = self.engine.name() self.renderer_version = self.engine.version() + self.priority = priority + self.owner = owner + self.client = client + self.name = name # Frame Ranges self.total_frames = 0 self.current_frame = 0 # Logging - self.log_path = None self.start_time = None self.end_time = None @@ -74,6 +106,18 @@ class BaseRenderWorker(object): self.is_finished = False self.last_output = None + @property + def status(self): + return self._status + + @status.setter + def status(self, value): + self._status = value.value + + @status.getter + def status(self): + return string_to_status(self._status) + def validate(self): if not os.path.exists(self.input_path): raise FileNotFoundError(f"Cannot find input path: {self.input_path}") @@ -101,6 +145,11 @@ class BaseRenderWorker(object): def generate_worker_subprocess(self): raise NotImplementedError("generate_worker_subprocess not implemented") + def log_path(self): + filename = (self.name or os.path.basename(self.input_path)) + '_' + \ + self.date_created.strftime("%Y.%m.%d_%H.%M.%S") + '.log' + return os.path.join(os.path.dirname(self.input_path), filename) + def start(self): if not os.path.exists(self.input_path): @@ -122,17 +171,9 @@ class BaseRenderWorker(object): self.__thread.start() def run(self): - # Setup logging - try: - if not self.log_path: - log_dir = os.path.join(os.path.dirname(self.input_path), 'logs') - if not os.path.exists(log_dir): - os.makedirs(log_dir) - self.log_path = os.path.join(log_dir, os.path.basename(self.input_path)) + '.log' - logger.info('Logs saved in {}'.format(self.log_path)) - except Exception as e: - logger.error("Error setting up logging: {}".format(e)) + log_dir = os.path.dirname(self.log_path()) + os.makedirs(log_dir, exist_ok=True) while self.failed_attempts < self.maximum_attempts and self.status is not RenderStatus.COMPLETED: @@ -146,7 +187,7 @@ class BaseRenderWorker(object): universal_newlines=False) self.start_time = datetime.now() - with open(self.log_path, "a") as f: + with open(self.log_path(), "a") as f: f.write(f"{self.start_time.isoformat()} - Starting {self.engine.name()} {self.engine.version()} " f"Render for {self.input_path}") @@ -257,24 +298,41 @@ class BaseRenderWorker(object): elapsed_time_string = strfdelta(elapsed_time) if elapsed_time else "Unknown" return elapsed_time_string + def file_list(self): + job_dir = os.path.dirname(self.output_path) + file_list = glob.glob(os.path.join(job_dir, '*')) + file_list.sort() + return file_list + def json(self): - worker_data = self.__dict__.copy() - worker_data['percent_complete'] = self.percent_complete() - worker_data['time_elapsed'] = self.time_elapsed() - worker_data['status'] = self.status.value - keys_to_remove = ['thread', 'process'] # remove unwanted keys from dict - for key in worker_data.keys(): - if key.startswith('_'): - keys_to_remove.append(key) - for key in keys_to_remove: - worker_data.pop(key, None) + job_dict = { + 'id': self.id, + 'name': self.name, + 'input_path': self.input_path, + 'output_path': self.output_path, + 'priority': self.priority, + 'owner': self.owner, + 'client': self.client, + 'date_created': self.date_created, + 'start_time': self.start_time, + 'status': self.status.value, + 'time_elapsed': self.time_elapsed(), + 'file_hash': self.file_hash, + 'percent_complete': self.percent_complete(), + 'file_count': len(self.file_list()), + 'renderer': self.renderer, + 'renderer_version': self.renderer_version, + 'errors': getattr(self, 'errors', None), + 'total_frames': self.total_frames, + 'last_output': getattr(self, 'last_output', None) + } # convert to json and back to auto-convert dates to iso format def date_serializer(o): if isinstance(o, datetime): return o.isoformat() - json_convert = json.dumps(worker_data, default=date_serializer) + json_convert = json.dumps(job_dict, default=date_serializer) worker_json = json.loads(json_convert) return worker_json diff --git a/lib/render_workers/blender_worker.py b/lib/render_workers/blender_worker.py index c67e95f..d079862 100644 --- a/lib/render_workers/blender_worker.py +++ b/lib/render_workers/blender_worker.py @@ -13,9 +13,10 @@ class BlenderRenderWorker(BaseRenderWorker): engine = Blender - def __init__(self, input_path, output_path, args=None): - super(BlenderRenderWorker, self).__init__(input_path=input_path, output_path=output_path, - ignore_extensions=False, args=args) + def __init__(self, input_path, output_path, priority=2, args=None, owner=None, + client=None, name=None): + super(BlenderRenderWorker, self).__init__(input_path=input_path, output_path=output_path, args=args, + client=client, priority=priority, owner=owner, name=name) # Args self.blender_engine = self.args.get('engine', 'BLENDER_EEVEE').upper() diff --git a/lib/render_workers/ffmpeg_worker.py b/lib/render_workers/ffmpeg_worker.py index 4156313..0f44401 100644 --- a/lib/render_workers/ffmpeg_worker.py +++ b/lib/render_workers/ffmpeg_worker.py @@ -8,9 +8,10 @@ class FFMPEGRenderWorker(BaseRenderWorker): engine = FFMPEG - def __init__(self, input_path, output_path, args=None): - super(FFMPEGRenderWorker, self).__init__(input_path=input_path, output_path=output_path, ignore_extensions=True, - args=args) + def __init__(self, input_path, output_path, priority=2, args=None, owner=None, + client=None, name=None): + super(FFMPEGRenderWorker, self).__init__(input_path=input_path, output_path=output_path, args=args, + client=client, priority=priority, owner=owner, name=name) stream_info = subprocess.check_output([self.engine.renderer_path(), "-i", # https://stackoverflow.com/a/61604105 input_path, "-map", "0:v:0", "-c", "copy", "-f", "null", "-y", diff --git a/lib/render_workers/worker_factory.py b/lib/render_workers/worker_factory.py index f2cc3eb..4d40220 100644 --- a/lib/render_workers/worker_factory.py +++ b/lib/render_workers/worker_factory.py @@ -10,9 +10,11 @@ class RenderWorkerFactory: return classes @staticmethod - def create_worker(renderer, input_path, output_path, args=None): + def create_worker(renderer, input_path, output_path, priority=2, args=None, owner=None, + client=None, name=None): worker_class = RenderWorkerFactory.class_for_name(renderer) - return worker_class(input_path=input_path, output_path=output_path, args=args) + return worker_class(input_path=input_path, output_path=output_path, args=args, priority=priority, owner=owner, + client=client, name=name) @staticmethod def supported_renderers(): diff --git a/lib/scheduled_job.py b/lib/scheduled_job.py deleted file mode 100644 index c9df921..0000000 --- a/lib/scheduled_job.py +++ /dev/null @@ -1,161 +0,0 @@ -import glob -import hashlib -import json -import logging -import os -import uuid -from datetime import datetime - -from sqlalchemy import Column, Integer, String, DateTime, JSON, event -from sqlalchemy.ext.declarative import declarative_base - -from .render_workers.base_worker import RenderStatus -from .render_workers.worker_factory import RenderWorkerFactory - -logger = logging.getLogger() -Base = declarative_base() - - -class ScheduledJob(Base): - __tablename__ = 'scheduled_jobs' - id = Column(String, primary_key=True) - - renderer = Column(String) - input_path = Column(String) - output_path = Column(String) - priority = Column(Integer) - owner = Column(String) - client = Column(String) - notify = Column(String) - date_created = Column(DateTime) - scheduled_start = Column(DateTime) - name = Column(String) - file_hash = Column(String) - worker_data_store = Column(JSON) - - def __init__(self, renderer, input_path, output_path, args, priority=2, owner=None, client=None, notify=None, - custom_id=None, name=None): - self.id = custom_id or self.generate_id() - - self.renderer = renderer - self.input_path = input_path - self.output_path = output_path - self.priority = priority - self.owner = owner - self.client = client - self.notify = notify - self.date_created = datetime.now() - self.scheduled_start = None - self.name = name or os.path.basename(input_path) + '_' + self.date_created.strftime("%Y.%m.%d_%H.%M.%S") - - self.worker_object = RenderWorkerFactory.create_worker(renderer, input_path, output_path, args) - self.worker_object.log_path = self.log_path() - self.worker_object.validate() - - self.file_hash = None - if not self.file_hash and os.path.exists(input_path): - self.file_hash = hashlib.md5(open(input_path, 'rb').read()).hexdigest() - - def worker_data(self): - if hasattr(self, 'worker_object'): - fetched = self.worker_object.json() - if fetched != self.worker_data_store: - self.worker_data_store = fetched - return self.worker_data_store - - @staticmethod - def before_insert(mapper, connection, target): - logger.debug(f"Before insert: {target.name}") - if hasattr(target, 'worker_object'): - target.worker_data_store = target.worker_object.json() - - @staticmethod - def before_update(mapper, connection, target): - logger.debug(f"Before update: {target.name}") - if hasattr(target, 'worker_object'): - target.worker_data_store = target.worker_object.json() - - @classmethod - def register_user_events(cls): - event.listen(cls, 'before_insert', cls.before_insert) - event.listen(cls, 'before_update', cls.before_update) - - def render_status(self): - try: - worker_status = RenderStatus(self.worker_data()['status']) - if hasattr(self, 'worker_object'): - if self.scheduled_start and worker_status == RenderStatus.NOT_STARTED: - worker_status = RenderStatus.SCHEDULED - else: - if worker_status == RenderStatus.RUNNING: - worker_status = RenderStatus.ERROR - elif worker_status == RenderStatus.NOT_STARTED: - worker_status = RenderStatus.CANCELLED - except Exception as e: - logger.error(f"Exception fetching render status: {e}") - worker_status = RenderStatus.UNDEFINED - return worker_status - - def json(self): - """Converts RenderJob into JSON-friendly dict""" - job_dict = None - try: - worker_data = self.worker_data() - job_dict = { - 'id': self.id, - 'name': self.name, - 'input_path': self.input_path, - 'output_path': self.output_path, - 'priority': self.priority, - 'owner': self.owner, - 'client': self.client, - 'notify': self.notify, - 'date_created': self.date_created, - 'scheduled_start': self.scheduled_start, - 'start_time': worker_data.get('start_time', None), - 'status': self.render_status().value, - 'time_elapsed': worker_data.get('time_elapsed', None), - 'file_hash': self.file_hash, - 'percent_complete': worker_data.get('percent_complete', None), - 'file_count': len(self.file_list()), - 'renderer': self.renderer, - 'renderer_version': worker_data.get('renderer_version', None), - 'errors': worker_data.get('errors', []), - 'total_frames': worker_data.get('total_frames', -1), - 'last_output': worker_data.get('last_output', None) - } - - # convert to json and back to auto-convert dates to iso format - def date_serializer(o): - if isinstance(o, datetime): - return o.isoformat() - json_convert = json.dumps(job_dict, default=date_serializer) - job_dict = json.loads(json_convert) - except Exception as e: - logger.exception(e) - logger.error("Error converting to JSON: {}".format(e)) - return job_dict - - def start(self): - if hasattr(self, 'worker_object'): - self.worker_object.start() - - def stop(self): - if hasattr(self, 'worker_object'): - self.worker_object.stop() - - def work_path(self): - return os.path.dirname(self.output_path) - - def file_list(self): - job_dir = os.path.dirname(self.output_path) - file_list = glob.glob(os.path.join(job_dir, '*')) - file_list.sort() - return file_list - - def log_path(self): - return os.path.join(os.path.dirname(self.input_path), self.name + '.log') - - @classmethod - def generate_id(cls): - return str(uuid.uuid4()).split('-')[0] diff --git a/lib/server/job_server.py b/lib/server/job_server.py index 82a8850..d2ae020 100755 --- a/lib/server/job_server.py +++ b/lib/server/job_server.py @@ -16,7 +16,6 @@ import yaml from flask import Flask, request, render_template, send_file, after_this_request, Response, redirect, url_for, abort from werkzeug.utils import secure_filename -from lib.scheduled_job import ScheduledJob from lib.render_queue import RenderQueue, JobNotFoundError from lib.render_workers.worker_factory import RenderWorkerFactory from lib.render_workers.base_worker import string_to_status, RenderStatus @@ -34,7 +33,7 @@ def sorted_jobs(all_jobs, sort_by_date=True): sorted_job_list = [] if all_jobs: for status_category in categories: - found_jobs = [x for x in all_jobs if x.render_status() == status_category.value] + found_jobs = [x for x in all_jobs if x.status == status_category.value] if found_jobs: sorted_found_jobs = sorted(found_jobs, key=lambda d: d.date_created, reverse=True) sorted_job_list.extend(sorted_found_jobs) @@ -61,11 +60,11 @@ def job_detail(job_id): table_html = json2html.json2html.convert(json=found_job.json(), table_attributes='class="table is-narrow is-striped is-fullwidth"') media_url = None - if found_job.file_list() and found_job.render_status() == RenderStatus.COMPLETED: + if found_job.file_list() and found_job.status == RenderStatus.COMPLETED: media_basename = os.path.basename(found_job.file_list()[0]) media_url = f"/api/job/{job_id}/file/{media_basename}" return render_template('details.html', detail_table=table_html, media_url=media_url, - hostname=RenderQueue.hostname, job_status=found_job.render_status().value.title(), + hostname=RenderQueue.hostname, job_status=found_job.status.value.title(), job=found_job, renderer_info=renderer_info()) @@ -79,20 +78,20 @@ def job_thumbnail(job_id): thumb_image_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.jpg') if not os.path.exists(thumb_video_path) and not os.path.exists(thumb_video_path + '_IN-PROGRESS') and \ - found_job.render_status() not in [RenderStatus.CANCELLED, RenderStatus.ERROR]: + found_job.status not in [RenderStatus.CANCELLED, RenderStatus.ERROR]: generate_thumbnail_for_job(found_job, thumb_video_path, thumb_image_path, max_width=240) if os.path.exists(thumb_video_path) and not os.path.exists(thumb_video_path + '_IN-PROGRESS'): return send_file(thumb_video_path, mimetype="video/mp4") elif os.path.exists(thumb_image_path): return send_file(thumb_image_path, mimetype='image/jpeg') - elif found_job.render_status() == RenderStatus.RUNNING: + elif found_job.status == RenderStatus.RUNNING: return send_file('static/images/gears.png', mimetype="image/png") - elif found_job.render_status() == RenderStatus.CANCELLED: + elif found_job.status == RenderStatus.CANCELLED: return send_file('static/images/cancelled.png', mimetype="image/png") - elif found_job.render_status() == RenderStatus.SCHEDULED: + elif found_job.status == RenderStatus.SCHEDULED: return send_file('static/images/scheduled.png', mimetype="image/png") - elif found_job.render_status() == RenderStatus.NOT_STARTED: + elif found_job.status == RenderStatus.NOT_STARTED: return send_file('static/images/not_started.png', mimetype="image/png") return send_file('static/images/error.png', mimetype="image/png") @@ -329,8 +328,13 @@ def add_job(job_params, remove_job_dir_on_failure=False): if client == RenderQueue.hostname: logger.info(f"Creating job locally - {name if name else input_path}") try: - render_job = ScheduledJob(renderer, input_path, output_path, args, priority, job_owner, client, - notify=False, custom_id=custom_id, name=name) + render_job = RenderWorkerFactory.create_worker(renderer=renderer, input_path=input_path, + output_path=output_path, args=args) + render_job.client = client + render_job.owner = job_owner + render_job.name = name + render_job.priority = priority + RenderQueue.add_to_render_queue(render_job, force_start=force_start) return render_job.json() except Exception as e: diff --git a/lib/server/templates/details.html b/lib/server/templates/details.html index 5e19b6a..020daa6 100644 --- a/lib/server/templates/details.html +++ b/lib/server/templates/details.html @@ -17,7 +17,7 @@
Time Elapsed: {{job.worker_data()['time_elapsed']}}