Merge pull request #5 from blw1138/scheduled_jobs_workers_merge

Scheduled jobs -> workers merge
This commit is contained in:
Brett Williams
2023-05-30 22:56:57 -05:00
committed by GitHub
10 changed files with 124 additions and 221 deletions

View File

@@ -7,8 +7,7 @@ import requests
from sqlalchemy import create_engine, Column, String, Integer
from sqlalchemy.orm import sessionmaker
from .render_workers.base_worker import RenderStatus
from .scheduled_job import ScheduledJob, Base
from .render_workers.base_worker import RenderStatus, BaseRenderWorker, Base
logger = logging.getLogger()
@@ -45,7 +44,6 @@ class RenderQueue:
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
ScheduledJob.register_user_events()
job_queue = []
maximum_renderer_instances = {'blender': 1, 'aerender': 1, 'ffmpeg': 4}
hostname = None
@@ -62,7 +60,7 @@ class RenderQueue:
def add_to_render_queue(cls, render_job, force_start=False, client=None):
if not client or render_job.client == cls.hostname:
logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job.worker_object))
logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job))
render_job.client = cls.hostname
cls.job_queue.append(render_job)
if force_start:
@@ -89,7 +87,7 @@ class RenderQueue:
@classmethod
def jobs_with_status(cls, status, priority_sorted=False):
found_jobs = [x for x in cls.all_jobs() if x.render_status() == status]
found_jobs = [x for x in cls.all_jobs() if x.status == status]
if priority_sorted:
found_jobs = sorted(found_jobs, key=lambda a: a.priority, reverse=False)
return found_jobs
@@ -103,7 +101,7 @@ class RenderQueue:
@classmethod
def clear_history(cls):
to_remove = [x for x in cls.all_jobs() if x.render_status() in [RenderStatus.CANCELLED,
to_remove = [x for x in cls.all_jobs() if x.status in [RenderStatus.CANCELLED,
RenderStatus.COMPLETED, RenderStatus.ERROR]]
for job_to_remove in to_remove:
cls.delete_job(job_to_remove)
@@ -111,7 +109,7 @@ class RenderQueue:
@classmethod
def load_state(cls):
cls.job_queue = cls.session.query(ScheduledJob).all()
cls.job_queue = cls.session.query(BaseRenderWorker).all()
@classmethod
def save_state(cls):
@@ -141,15 +139,14 @@ class RenderQueue:
@classmethod
def start_job(cls, job):
logger.info('Starting {}render: {} - Priority {}'.format('scheduled ' if job.scheduled_start else '', job.name,
job.priority))
logger.info(f'Starting render: {job.name} - Priority {job.priority}')
job.start()
@classmethod
def cancel_job(cls, job):
logger.info(f'Cancelling job ID: {job.id}')
job.stop()
return job.render_status() == RenderStatus.CANCELLED
return job.status == RenderStatus.CANCELLED
@classmethod
def delete_job(cls, job):

View File

@@ -22,9 +22,10 @@ class AERenderWorker(BaseRenderWorker):
supported_extensions = ['.aep']
engine = AERender
def __init__(self, input_path, output_path, args=None):
super(AERenderWorker, self).__init__(input_path=input_path, output_path=output_path, ignore_extensions=False,
args=args)
def __init__(self, input_path, output_path, priority=2, args=None, owner=None,
client=None, name=None):
super(AERenderWorker, self).__init__(input_path=input_path, output_path=output_path, args=args,
client=client, priority=priority, owner=owner, name=name)
self.comp = args.get('comp', None)
self.render_settings = args.get('render_settings', None)

View File

@@ -5,12 +5,17 @@ import os
import subprocess
import threading
import json
import glob
from datetime import datetime
from enum import Enum
from sqlalchemy import Column, Integer, String, DateTime, JSON, event
from sqlalchemy.ext.declarative import declarative_base
import psutil
logger = logging.getLogger()
Base = declarative_base()
class RenderStatus(Enum):
@@ -27,14 +32,32 @@ def string_to_status(string):
for stat in RenderStatus:
if stat.value == string:
return stat
return RenderStatus.ERROR
return RenderStatus.UNDEFINED
class BaseRenderWorker(object):
class BaseRenderWorker(Base):
__tablename__ = 'render_workers'
id = Column(String, primary_key=True)
input_path = Column(String)
output_path = Column(String)
date_created = Column(DateTime)
start_time = Column(DateTime)
end_time = Column(DateTime)
renderer = Column(String)
renderer_version = Column(String)
priority = Column(Integer)
total_frames = Column(Integer)
owner = Column(String)
client = Column(String)
name = Column(String)
file_hash = Column(String)
_status = Column(String)
engine = None
def __init__(self, input_path, output_path, args=None, ignore_extensions=True):
def __init__(self, input_path, output_path, priority=2, args=None, ignore_extensions=True, owner=None, client=None,
name=None):
if not ignore_extensions:
if not any(ext in input_path for ext in self.engine.supported_extensions):
@@ -44,19 +67,28 @@ class BaseRenderWorker(object):
if not self.engine:
raise NotImplementedError("Engine not defined")
def generate_id():
import uuid
return str(uuid.uuid4()).split('-')[0]
# Essential Info
self.id = generate_id()
self.input_path = input_path
self.output_path = output_path
self.args = args or {}
self.date_created = datetime.now()
self.renderer = self.engine.name()
self.renderer_version = self.engine.version()
self.priority = priority
self.owner = owner
self.client = client
self.name = name
# Frame Ranges
self.total_frames = 0
self.current_frame = 0
# Logging
self.log_path = None
self.start_time = None
self.end_time = None
@@ -74,6 +106,18 @@ class BaseRenderWorker(object):
self.is_finished = False
self.last_output = None
@property
def status(self):
return self._status
@status.setter
def status(self, value):
self._status = value.value
@status.getter
def status(self):
return string_to_status(self._status)
def validate(self):
if not os.path.exists(self.input_path):
raise FileNotFoundError(f"Cannot find input path: {self.input_path}")
@@ -101,6 +145,11 @@ class BaseRenderWorker(object):
def generate_worker_subprocess(self):
raise NotImplementedError("generate_worker_subprocess not implemented")
def log_path(self):
filename = (self.name or os.path.basename(self.input_path)) + '_' + \
self.date_created.strftime("%Y.%m.%d_%H.%M.%S") + '.log'
return os.path.join(os.path.dirname(self.input_path), filename)
def start(self):
if not os.path.exists(self.input_path):
@@ -122,17 +171,9 @@ class BaseRenderWorker(object):
self.__thread.start()
def run(self):
# Setup logging
try:
if not self.log_path:
log_dir = os.path.join(os.path.dirname(self.input_path), 'logs')
if not os.path.exists(log_dir):
os.makedirs(log_dir)
self.log_path = os.path.join(log_dir, os.path.basename(self.input_path)) + '.log'
logger.info('Logs saved in {}'.format(self.log_path))
except Exception as e:
logger.error("Error setting up logging: {}".format(e))
log_dir = os.path.dirname(self.log_path())
os.makedirs(log_dir, exist_ok=True)
while self.failed_attempts < self.maximum_attempts and self.status is not RenderStatus.COMPLETED:
@@ -146,7 +187,7 @@ class BaseRenderWorker(object):
universal_newlines=False)
self.start_time = datetime.now()
with open(self.log_path, "a") as f:
with open(self.log_path(), "a") as f:
f.write(f"{self.start_time.isoformat()} - Starting {self.engine.name()} {self.engine.version()} "
f"Render for {self.input_path}")
@@ -257,24 +298,41 @@ class BaseRenderWorker(object):
elapsed_time_string = strfdelta(elapsed_time) if elapsed_time else "Unknown"
return elapsed_time_string
def file_list(self):
job_dir = os.path.dirname(self.output_path)
file_list = glob.glob(os.path.join(job_dir, '*'))
file_list.sort()
return file_list
def json(self):
worker_data = self.__dict__.copy()
worker_data['percent_complete'] = self.percent_complete()
worker_data['time_elapsed'] = self.time_elapsed()
worker_data['status'] = self.status.value
keys_to_remove = ['thread', 'process'] # remove unwanted keys from dict
for key in worker_data.keys():
if key.startswith('_'):
keys_to_remove.append(key)
for key in keys_to_remove:
worker_data.pop(key, None)
job_dict = {
'id': self.id,
'name': self.name,
'input_path': self.input_path,
'output_path': self.output_path,
'priority': self.priority,
'owner': self.owner,
'client': self.client,
'date_created': self.date_created,
'start_time': self.start_time,
'status': self.status.value,
'time_elapsed': self.time_elapsed(),
'file_hash': self.file_hash,
'percent_complete': self.percent_complete(),
'file_count': len(self.file_list()),
'renderer': self.renderer,
'renderer_version': self.renderer_version,
'errors': getattr(self, 'errors', None),
'total_frames': self.total_frames,
'last_output': getattr(self, 'last_output', None)
}
# convert to json and back to auto-convert dates to iso format
def date_serializer(o):
if isinstance(o, datetime):
return o.isoformat()
json_convert = json.dumps(worker_data, default=date_serializer)
json_convert = json.dumps(job_dict, default=date_serializer)
worker_json = json.loads(json_convert)
return worker_json

View File

@@ -13,9 +13,10 @@ class BlenderRenderWorker(BaseRenderWorker):
engine = Blender
def __init__(self, input_path, output_path, args=None):
super(BlenderRenderWorker, self).__init__(input_path=input_path, output_path=output_path,
ignore_extensions=False, args=args)
def __init__(self, input_path, output_path, priority=2, args=None, owner=None,
client=None, name=None):
super(BlenderRenderWorker, self).__init__(input_path=input_path, output_path=output_path, args=args,
client=client, priority=priority, owner=owner, name=name)
# Args
self.blender_engine = self.args.get('engine', 'BLENDER_EEVEE').upper()

View File

@@ -8,9 +8,10 @@ class FFMPEGRenderWorker(BaseRenderWorker):
engine = FFMPEG
def __init__(self, input_path, output_path, args=None):
super(FFMPEGRenderWorker, self).__init__(input_path=input_path, output_path=output_path, ignore_extensions=True,
args=args)
def __init__(self, input_path, output_path, priority=2, args=None, owner=None,
client=None, name=None):
super(FFMPEGRenderWorker, self).__init__(input_path=input_path, output_path=output_path, args=args,
client=client, priority=priority, owner=owner, name=name)
stream_info = subprocess.check_output([self.engine.renderer_path(), "-i", # https://stackoverflow.com/a/61604105
input_path, "-map", "0:v:0", "-c", "copy", "-f", "null", "-y",

View File

@@ -10,9 +10,11 @@ class RenderWorkerFactory:
return classes
@staticmethod
def create_worker(renderer, input_path, output_path, args=None):
def create_worker(renderer, input_path, output_path, priority=2, args=None, owner=None,
client=None, name=None):
worker_class = RenderWorkerFactory.class_for_name(renderer)
return worker_class(input_path=input_path, output_path=output_path, args=args)
return worker_class(input_path=input_path, output_path=output_path, args=args, priority=priority, owner=owner,
client=client, name=name)
@staticmethod
def supported_renderers():

View File

@@ -1,161 +0,0 @@
import glob
import hashlib
import json
import logging
import os
import uuid
from datetime import datetime
from sqlalchemy import Column, Integer, String, DateTime, JSON, event
from sqlalchemy.ext.declarative import declarative_base
from .render_workers.base_worker import RenderStatus
from .render_workers.worker_factory import RenderWorkerFactory
logger = logging.getLogger()
Base = declarative_base()
class ScheduledJob(Base):
__tablename__ = 'scheduled_jobs'
id = Column(String, primary_key=True)
renderer = Column(String)
input_path = Column(String)
output_path = Column(String)
priority = Column(Integer)
owner = Column(String)
client = Column(String)
notify = Column(String)
date_created = Column(DateTime)
scheduled_start = Column(DateTime)
name = Column(String)
file_hash = Column(String)
worker_data_store = Column(JSON)
def __init__(self, renderer, input_path, output_path, args, priority=2, owner=None, client=None, notify=None,
custom_id=None, name=None):
self.id = custom_id or self.generate_id()
self.renderer = renderer
self.input_path = input_path
self.output_path = output_path
self.priority = priority
self.owner = owner
self.client = client
self.notify = notify
self.date_created = datetime.now()
self.scheduled_start = None
self.name = name or os.path.basename(input_path) + '_' + self.date_created.strftime("%Y.%m.%d_%H.%M.%S")
self.worker_object = RenderWorkerFactory.create_worker(renderer, input_path, output_path, args)
self.worker_object.log_path = self.log_path()
self.worker_object.validate()
self.file_hash = None
if not self.file_hash and os.path.exists(input_path):
self.file_hash = hashlib.md5(open(input_path, 'rb').read()).hexdigest()
def worker_data(self):
if hasattr(self, 'worker_object'):
fetched = self.worker_object.json()
if fetched != self.worker_data_store:
self.worker_data_store = fetched
return self.worker_data_store
@staticmethod
def before_insert(mapper, connection, target):
logger.debug(f"Before insert: {target.name}")
if hasattr(target, 'worker_object'):
target.worker_data_store = target.worker_object.json()
@staticmethod
def before_update(mapper, connection, target):
logger.debug(f"Before update: {target.name}")
if hasattr(target, 'worker_object'):
target.worker_data_store = target.worker_object.json()
@classmethod
def register_user_events(cls):
event.listen(cls, 'before_insert', cls.before_insert)
event.listen(cls, 'before_update', cls.before_update)
def render_status(self):
try:
worker_status = RenderStatus(self.worker_data()['status'])
if hasattr(self, 'worker_object'):
if self.scheduled_start and worker_status == RenderStatus.NOT_STARTED:
worker_status = RenderStatus.SCHEDULED
else:
if worker_status == RenderStatus.RUNNING:
worker_status = RenderStatus.ERROR
elif worker_status == RenderStatus.NOT_STARTED:
worker_status = RenderStatus.CANCELLED
except Exception as e:
logger.error(f"Exception fetching render status: {e}")
worker_status = RenderStatus.UNDEFINED
return worker_status
def json(self):
"""Converts RenderJob into JSON-friendly dict"""
job_dict = None
try:
worker_data = self.worker_data()
job_dict = {
'id': self.id,
'name': self.name,
'input_path': self.input_path,
'output_path': self.output_path,
'priority': self.priority,
'owner': self.owner,
'client': self.client,
'notify': self.notify,
'date_created': self.date_created,
'scheduled_start': self.scheduled_start,
'start_time': worker_data.get('start_time', None),
'status': self.render_status().value,
'time_elapsed': worker_data.get('time_elapsed', None),
'file_hash': self.file_hash,
'percent_complete': worker_data.get('percent_complete', None),
'file_count': len(self.file_list()),
'renderer': self.renderer,
'renderer_version': worker_data.get('renderer_version', None),
'errors': worker_data.get('errors', []),
'total_frames': worker_data.get('total_frames', -1),
'last_output': worker_data.get('last_output', None)
}
# convert to json and back to auto-convert dates to iso format
def date_serializer(o):
if isinstance(o, datetime):
return o.isoformat()
json_convert = json.dumps(job_dict, default=date_serializer)
job_dict = json.loads(json_convert)
except Exception as e:
logger.exception(e)
logger.error("Error converting to JSON: {}".format(e))
return job_dict
def start(self):
if hasattr(self, 'worker_object'):
self.worker_object.start()
def stop(self):
if hasattr(self, 'worker_object'):
self.worker_object.stop()
def work_path(self):
return os.path.dirname(self.output_path)
def file_list(self):
job_dir = os.path.dirname(self.output_path)
file_list = glob.glob(os.path.join(job_dir, '*'))
file_list.sort()
return file_list
def log_path(self):
return os.path.join(os.path.dirname(self.input_path), self.name + '.log')
@classmethod
def generate_id(cls):
return str(uuid.uuid4()).split('-')[0]

View File

@@ -16,7 +16,6 @@ import yaml
from flask import Flask, request, render_template, send_file, after_this_request, Response, redirect, url_for, abort
from werkzeug.utils import secure_filename
from lib.scheduled_job import ScheduledJob
from lib.render_queue import RenderQueue, JobNotFoundError
from lib.render_workers.worker_factory import RenderWorkerFactory
from lib.render_workers.base_worker import string_to_status, RenderStatus
@@ -34,7 +33,7 @@ def sorted_jobs(all_jobs, sort_by_date=True):
sorted_job_list = []
if all_jobs:
for status_category in categories:
found_jobs = [x for x in all_jobs if x.render_status() == status_category.value]
found_jobs = [x for x in all_jobs if x.status == status_category.value]
if found_jobs:
sorted_found_jobs = sorted(found_jobs, key=lambda d: d.date_created, reverse=True)
sorted_job_list.extend(sorted_found_jobs)
@@ -61,11 +60,11 @@ def job_detail(job_id):
table_html = json2html.json2html.convert(json=found_job.json(),
table_attributes='class="table is-narrow is-striped is-fullwidth"')
media_url = None
if found_job.file_list() and found_job.render_status() == RenderStatus.COMPLETED:
if found_job.file_list() and found_job.status == RenderStatus.COMPLETED:
media_basename = os.path.basename(found_job.file_list()[0])
media_url = f"/api/job/{job_id}/file/{media_basename}"
return render_template('details.html', detail_table=table_html, media_url=media_url,
hostname=RenderQueue.hostname, job_status=found_job.render_status().value.title(),
hostname=RenderQueue.hostname, job_status=found_job.status.value.title(),
job=found_job, renderer_info=renderer_info())
@@ -79,20 +78,20 @@ def job_thumbnail(job_id):
thumb_image_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.jpg')
if not os.path.exists(thumb_video_path) and not os.path.exists(thumb_video_path + '_IN-PROGRESS') and \
found_job.render_status() not in [RenderStatus.CANCELLED, RenderStatus.ERROR]:
found_job.status not in [RenderStatus.CANCELLED, RenderStatus.ERROR]:
generate_thumbnail_for_job(found_job, thumb_video_path, thumb_image_path, max_width=240)
if os.path.exists(thumb_video_path) and not os.path.exists(thumb_video_path + '_IN-PROGRESS'):
return send_file(thumb_video_path, mimetype="video/mp4")
elif os.path.exists(thumb_image_path):
return send_file(thumb_image_path, mimetype='image/jpeg')
elif found_job.render_status() == RenderStatus.RUNNING:
elif found_job.status == RenderStatus.RUNNING:
return send_file('static/images/gears.png', mimetype="image/png")
elif found_job.render_status() == RenderStatus.CANCELLED:
elif found_job.status == RenderStatus.CANCELLED:
return send_file('static/images/cancelled.png', mimetype="image/png")
elif found_job.render_status() == RenderStatus.SCHEDULED:
elif found_job.status == RenderStatus.SCHEDULED:
return send_file('static/images/scheduled.png', mimetype="image/png")
elif found_job.render_status() == RenderStatus.NOT_STARTED:
elif found_job.status == RenderStatus.NOT_STARTED:
return send_file('static/images/not_started.png', mimetype="image/png")
return send_file('static/images/error.png', mimetype="image/png")
@@ -329,8 +328,13 @@ def add_job(job_params, remove_job_dir_on_failure=False):
if client == RenderQueue.hostname:
logger.info(f"Creating job locally - {name if name else input_path}")
try:
render_job = ScheduledJob(renderer, input_path, output_path, args, priority, job_owner, client,
notify=False, custom_id=custom_id, name=name)
render_job = RenderWorkerFactory.create_worker(renderer=renderer, input_path=input_path,
output_path=output_path, args=args)
render_job.client = client
render_job.owner = job_owner
render_job.name = name
render_job.priority = priority
RenderQueue.add_to_render_queue(render_job, force_start=force_start)
return render_job.json()
except Exception as e:

View File

@@ -17,7 +17,7 @@
<br>Time Elapsed: <span id="time-elapsed">{{job.worker_data()['time_elapsed']}}</span>
</span>
<script>
var startingStatus = '{{job.render_status().value}}';
var startingStatus = '{{job.status.value}}';
function update_job() {
$.getJSON('/api/job/{{job.id}}', function(data) {
document.getElementById('progress-bar').value = (data.percent_complete * 100);

View File

@@ -36,7 +36,7 @@ def generate_thumbnail_for_job(job, thumb_video_path, thumb_image_path, max_widt
os.remove(in_progress_path)
# Determine best source file to use for thumbs
if job.render_status() == RenderStatus.COMPLETED: # use finished file for thumb
if job.status == RenderStatus.COMPLETED: # use finished file for thumb
source_path = job.file_list()
else:
source_path = [job.input_path] # use source if nothing else