Merge pull request #3 from blw1138/worker_engine_refactor

Worker engine refactor
This commit is contained in:
2023-05-22 19:47:40 -05:00
committed by GitHub
6 changed files with 196 additions and 136 deletions

View File

@@ -7,7 +7,7 @@ from datetime import datetime
import psutil
import requests
from .render_job import RenderJob
from .scheduled_job import ScheduledJob
from .render_workers.render_worker import RenderStatus
logger = logging.getLogger()
@@ -25,7 +25,7 @@ class JobNotFoundError(Exception):
class RenderQueue:
job_queue = []
render_clients = []
maximum_renderer_instances = {'Blender': 2, 'After Effects': 1, 'ffmpeg': 4}
maximum_renderer_instances = {'blender': 2, 'aerender': 1, 'ffmpeg': 4}
host_name = None
port = 8080
client_mode = False
@@ -95,9 +95,9 @@ class RenderQueue:
for job in saved_state.get('jobs', []):
try:
render_job = RenderJob(renderer=job['renderer'], input_path=job['worker']['input_path'],
output_path=job['worker']['output_path'], args=job['worker']['args'],
priority=job['priority'], client=job['client'])
render_job = ScheduledJob(renderer=job['renderer'], input_path=job['worker']['input_path'],
output_path=job['worker']['output_path'], args=job['worker']['args'],
priority=job['priority'], client=job['client'])
# Load Worker values
for key, val in job['worker'].items():
@@ -153,7 +153,7 @@ class RenderQueue:
not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True)
if not_started:
for job in not_started:
renderer = job.worker.renderer
renderer = job.worker.engine.name()
higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < job.priority]
max_renderers = renderer in instances.keys() and instances[
renderer] >= cls.maximum_renderer_instances.get(renderer, 1)
@@ -191,7 +191,7 @@ class RenderQueue:
@classmethod
def renderer_instances(cls):
from collections import Counter
all_instances = [x.worker.renderer for x in cls.running_jobs()]
all_instances = [x.worker.engine.name() for x in cls.running_jobs()]
return Counter(all_instances)
@classmethod

View File

@@ -4,21 +4,70 @@ import re
from .render_worker import *
class Blender(BaseRenderEngine):
install_paths = ['/Applications/Blender.app/Contents/MacOS/Blender']
supported_extensions = ['.blend']
@classmethod
def version(cls):
version = None
try:
render_path = cls.renderer_path()
if render_path:
ver_out = subprocess.check_output([render_path, '-v'])
version = ver_out.decode('utf-8').splitlines()[0].replace('Blender', '').strip()
except Exception as e:
logging.error(f'Failed to get Blender version: {e}')
return version
@classmethod
def get_formats(cls):
format_string = cls.get_help().split('Format Options')[-1].split('Animation Playback Options')[0]
formats = re.findall(r"'([A-Z_0-9]+)'", format_string)
return formats
@classmethod
def full_report(cls):
return {'version': cls.version(),
'help_text': cls.get_help(),
'formats': cls.get_formats()}
@classmethod
def run_python_expression(cls, path, python_expression):
if os.path.exists(path):
try:
return subprocess.run([cls.renderer_path(), '-b', path, '--python-expr', python_expression],
capture_output=True)
except Exception as e:
logger.warning(f"Error running python expression in blender: {e}")
pass
else:
raise FileNotFoundError
@classmethod
def run_python_script(cls, path, python_path):
if os.path.exists(path) and os.path.exists(python_path):
try:
return subprocess.run([cls.renderer_path(), '-b', path, '--python', python_path],
capture_output=True)
except Exception as e:
logger.warning(f"Error running python expression in blender: {e}")
pass
else:
raise FileNotFoundError
class BlenderRenderWorker(BaseRenderWorker):
renderer = 'Blender'
render_engine = 'blender'
supported_extensions = ['.blend']
install_paths = ['/Applications/Blender.app/Contents/MacOS/Blender']
supported_export_formats = ['TGA', 'RAWTGA', 'JPEG', 'IRIS', 'IRIZ', 'AVIRAW', 'AVIJPEG', 'PNG', 'BMP', 'HDR',
'TIFF', 'OPEN_EXR', 'OPEN_EXR_MULTILAYER', 'MPEG', 'CINEON', 'DPX', 'DDS', 'JP2']
engine = Blender
def __init__(self, input_path, output_path, args=None):
super(BlenderRenderWorker, self).__init__(input_path=input_path, output_path=output_path,
ignore_extensions=False, args=args)
# Args
self.engine = self.args.get('engine', 'BLENDER_EEVEE').upper()
self.blender_engine = self.args.get('engine', 'BLENDER_EEVEE').upper()
self.export_format = self.args.get('export_format', None) or 'JPEG'
self.camera = self.args.get('camera', None)
self.render_all_frames = self.args.get('render_all_frames', False) or \
@@ -34,21 +83,9 @@ class BlenderRenderWorker(BaseRenderWorker):
if self.render_all_frames else 1
self.current_frame = int(self.scene_info.get('frame_start', 0))
@classmethod
def version(cls):
version = None
try:
render_path = cls.renderer_path()
if render_path:
ver_out = subprocess.check_output([render_path, '-v'])
version = ver_out.decode('utf-8').splitlines()[0].replace('Blender', '').strip()
except Exception as e:
logging.error(f'Failed to get {cls.renderer} version: {e}')
return version
def generate_worker_subprocess(self):
cmd = [self.renderer_path()]
cmd = [self.engine.renderer_path()]
if self.args.get('background', True): # optionally run render not in background
cmd.append('-b')
cmd.append(self.input_path)
@@ -56,7 +93,7 @@ class BlenderRenderWorker(BaseRenderWorker):
if self.camera:
cmd.extend(['--python-expr', f"import bpy;bpy.context.scene.camera = bpy.data.objects['{self.camera}'];"])
cmd.extend(['-E', self.engine, '-o', self.output_path, '-F', self.export_format])
cmd.extend(['-E', self.blender_engine, '-o', self.output_path, '-F', self.export_format])
# all frames or single
cmd.extend(['-a'] if self.render_all_frames else ['-f', str(self.frame_to_render)])
@@ -128,30 +165,6 @@ class BlenderRenderWorker(BaseRenderWorker):
return max(total_percent, 0)
def run_python_expression_in_blend(path, python_expression):
if os.path.exists(path):
try:
return subprocess.run([BlenderRenderWorker.renderer_path(), '-b', path, '--python-expr', python_expression],
capture_output=True)
except Exception as e:
logger.warning(f"Error running python expression in blender: {e}")
pass
else:
raise FileNotFoundError
def run_python_script_in_blend(path, python_path):
if os.path.exists(path) and os.path.exists(python_path):
try:
return subprocess.run([BlenderRenderWorker.renderer_path(), '-b', path, '--python', python_path],
capture_output=True)
except Exception as e:
logger.warning(f"Error running python expression in blender: {e}")
pass
else:
raise FileNotFoundError
def pack_blender_files(path):
# Credit to L0Lock for pack script - https://blender.stackexchange.com/a/243935
pack_script = "import bpy\n" \
@@ -161,7 +174,7 @@ def pack_blender_files(path):
"bpy.ops.wm.save_as_mainfile(filepath=myPath[:-6]+'_packed'+myPath[-6:])"
try:
results = run_python_expression_in_blend(path, pack_script)
results = Blender.run_python_script(path, pack_script)
result_text = results.stdout.decode()
dir_name = os.path.dirname(path)
@@ -186,7 +199,7 @@ def get_scene_info(path):
scene_info = None
try:
results = run_python_script_in_blend(path, os.path.join(os.path.dirname(os.path.realpath(__file__)),
results = Blender.run_python_script(path, os.path.join(os.path.dirname(os.path.realpath(__file__)),
'scripts', 'get_blender_info.py'))
result_text = results.stdout.decode()
for line in result_text.splitlines():
@@ -201,12 +214,14 @@ def get_scene_info(path):
if __name__ == '__main__':
print(Blender.full_report())
# x = pack_blender_files('/Users/brett/Blender Files/temple_animatic.blend')
# print(x)
logging.basicConfig(format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S', level=logging.DEBUG)
r = BlenderRenderWorker('/Users/brett/Blender Files/temple_animatic.blend', '/Users/brett/testing1234')
# r.engine = 'CYCLES'
r.start()
while r.is_running():
time.sleep(1)
# logging.basicConfig(format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S', level=logging.DEBUG)
# r = BlenderRenderWorker('/Users/brett/Blender Files/temple_animatic.blend', '/Users/brett/testing1234')
# # r.engine = 'CYCLES'
# r.start()
# while r.is_running():
# time.sleep(1)

View File

@@ -1,27 +1,9 @@
#!/usr/bin/env python3
import re
import time
from .render_worker import *
class FFMPEGRenderWorker(BaseRenderWorker):
renderer = 'ffmpeg'
render_engine = 'ffmpeg'
def __init__(self, input_path, output_path, args=None):
super(FFMPEGRenderWorker, self).__init__(input_path=input_path, output_path=output_path, ignore_extensions=True,
args=args)
stream_info = subprocess.check_output([self.renderer_path(), "-i", # https://stackoverflow.com/a/61604105
input_path, "-map", "0:v:0", "-c", "copy", "-f", "null", "-y",
"/dev/null"], stderr=subprocess.STDOUT).decode('utf-8')
found_frames = re.findall('frame=\s*(\d+)', stream_info)
self.total_frames = found_frames[-1] if found_frames else '-1'
self.frame = 0
# Stats
self.current_frame = -1
class FFMPEG(BaseRenderEngine):
@classmethod
def version(cls):
@@ -35,18 +17,58 @@ class FFMPEGRenderWorker(BaseRenderWorker):
logger.error("Failed to get FFMPEG version: {}".format(e))
return version
@classmethod
def get_encoders(cls):
encoders_raw = subprocess.check_output([cls.renderer_path(), '-encoders'], stderr=subprocess.DEVNULL).decode('utf-8')
pattern = '(?P<type>[VASFXBD.]{6})\s+(?P<name>\S{2,})\s+(?P<description>.*)'
encoders = [m.groupdict() for m in re.finditer(pattern, encoders_raw)]
return encoders
@classmethod
def get_formats(cls):
formats_raw = subprocess.check_output([cls.renderer_path(), '-formats'], stderr=subprocess.DEVNULL).decode('utf-8')
pattern = '(?P<type>[DE]{1,2})\s+(?P<name>\S{2,})\s+(?P<description>.*)'
formats = [m.groupdict() for m in re.finditer(pattern, formats_raw)]
return formats
@classmethod
def full_report(cls):
return {'version': cls.version(),
'help_text': cls.get_help(),
'encoders': cls.get_encoders(),
'formats': cls.get_formats()}
class FFMPEGRenderWorker(BaseRenderWorker):
engine = FFMPEG
def __init__(self, input_path, output_path, args=None):
super(FFMPEGRenderWorker, self).__init__(input_path=input_path, output_path=output_path, ignore_extensions=True,
args=args)
stream_info = subprocess.check_output([self.engine.renderer_path(), "-i", # https://stackoverflow.com/a/61604105
input_path, "-map", "0:v:0", "-c", "copy", "-f", "null", "-y",
"/dev/null"], stderr=subprocess.DEVNULL).decode('utf-8')
found_frames = re.findall('frame=\s*(\d+)', stream_info)
self.total_frames = found_frames[-1] if found_frames else '-1'
self.frame = 0
# Stats
self.current_frame = -1
def generate_worker_subprocess(self):
cmd = [self.renderer_path(), '-y', '-stats', '-i', self.input_path]
cmd = [self.engine.renderer_path(), '-y', '-stats', '-i', self.input_path]
# Resize frame
if self.args.get('x_resolution', None) and self.args.get('y_resolution', None):
cmd.extend(['-vf', f"scale={self.args['x_resolution']}:{self.args['y_resolution']}"])
# Convert raw args from string if available
raw_args = self.get_raw_args()
raw_args = self.args.get('raw', None)
if raw_args:
cmd.extend(raw_args)
cmd.extend(raw_args.split(' '))
# Close with output path
cmd.append(self.output_path)
@@ -67,12 +89,15 @@ class FFMPEGRenderWorker(BaseRenderWorker):
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S', level=logging.DEBUG)
test_movie = '/Users/brettwilliams/Desktop/dark_knight_rises.mp4'
print(FFMPEG.full_report())
r = FFMPEGRenderWorker(test_movie, '/Users/brettwilliams/Desktop/test-ffmpeg.mp4', args=['-c:v', 'libx265', '-vtag', 'hvc1'])
# r = FFMPEGRenderer(test_movie, '/Users/brettwilliams/Desktop/dark_knight_rises-output.mp4')
r.start()
while r.is_running():
time.sleep(1)
# logging.basicConfig(format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S', level=logging.DEBUG)
#
# test_movie = '/Users/brettwilliams/Desktop/dark_knight_rises.mp4'
#
# r = FFMPEGRenderWorker(test_movie, '/Users/brettwilliams/Desktop/test-ffmpeg.mp4', args=['-c:v', 'libx265', '-vtag', 'hvc1'])
# # r = FFMPEGRenderer(test_movie, '/Users/brettwilliams/Desktop/dark_knight_rises-output.mp4')
# r.start()
# while r.is_running():
# time.sleep(1)

View File

@@ -30,27 +30,24 @@ def string_to_status(string):
class BaseRenderWorker(object):
renderer = 'BaseRenderWorker'
render_engine = None
render_engine_version = None
supported_extensions = []
install_paths = []
supported_export_formats = []
engine = None
def __init__(self, input_path, output_path, args=None, ignore_extensions=True):
if not ignore_extensions:
if not any(ext in input_path for ext in self.supported_extensions):
err_meg = f'Cannot find valid project with supported file extension for {self.renderer} renderer'
if not any(ext in input_path for ext in self.engine.supported_extensions):
err_meg = f'Cannot find valid project with supported file extension for {self.engine.name()} renderer'
logger.error(err_meg)
raise ValueError(err_meg)
if not self.engine:
raise NotImplementedError("Engine not defined")
# Essential Info
self.input_path = input_path
self.output_path = output_path
self.args = args or {}
self.date_created = datetime.now()
self.renderer_version = self.version()
self.renderer_version = self.engine.version()
# Frame Ranges
self.total_frames = 0
@@ -75,23 +72,6 @@ class BaseRenderWorker(object):
self.is_finished = False
self.last_output = None
@classmethod
def version(cls):
raise NotImplementedError("Unknown version")
@classmethod
def renderer_path(cls):
path = None
try:
path = subprocess.check_output(['which', cls.render_engine]).decode('utf-8').strip()
except subprocess.CalledProcessError:
for p in cls.install_paths:
if os.path.exists(p):
path = p
except Exception as e:
logging.exception(e)
return path
def validate(self):
if not os.path.exists(self.input_path):
raise FileNotFoundError(f"Cannot find input path: {self.input_path}")
@@ -128,15 +108,15 @@ class BaseRenderWorker(object):
self.errors.append(msg)
return
if not self.renderer_path():
if not self.engine.renderer_path():
self.status = RenderStatus.ERROR
msg = 'Cannot find render engine path for {}'.format(self.render_engine)
msg = 'Cannot find render engine path for {}'.format(self.engine.name())
logger.error(msg)
self.errors.append(msg)
return
self.status = RenderStatus.RUNNING
logger.info('Starting {0} {1} Render for {2}'.format(self.renderer, self.version(), self.input_path))
logger.info(f'Starting {self.engine.name()} {self.engine.version()} Render for {self.input_path}')
self.__thread.start()
def run(self):
@@ -166,12 +146,12 @@ class BaseRenderWorker(object):
with open(self.log_path, "a") as f:
f.write("{3} - Starting {0} {1} Render for {2}\n".format(self.renderer, self.version(), self.input_path,
self.start_time.isoformat()))
f.write(f"{self.start_time.isoformat()} - Starting {self.engine.name()} {self.engine.version()} "
f"Render for {self.input_path}")
f.write(f"Running command: {' '.join(subprocess_cmds)}\n")
for c in io.TextIOWrapper(self.__process.stdout, encoding="utf-8"): # or another encoding
f.write(c)
logger.debug(f"{self.renderer}Worker: {c.strip()}")
logger.debug(f"{self.engine.name()}Worker: {c.strip()}")
self.last_output = c.strip()
self._parse_stdout(c.strip())
f.write('\n')
@@ -187,18 +167,19 @@ class BaseRenderWorker(object):
duration = self.end_time - self.start_time
if return_code:
message = f"{self.renderer} render failed with return_code {return_code} after {duration}"
message = f"{self.engine.name()} render failed with return_code {return_code} after {duration}"
logger.error(message)
self.failed_attempts = self.failed_attempts + 1
else:
message = f"{self.renderer} render completed successfully in {duration}"
message = f"{self.engine.name()} render completed successfully in {duration}"
logger.info(message)
self.status = RenderStatus.COMPLETED
f.write(message)
if self.failed_attempts >= self.maximum_attempts and self.status is not RenderStatus.CANCELLED:
logger.error('{} Render of {} failed after {} attempts'.format(self.renderer, self.input_path, self.failed_attempts))
logger.error('{} Render of {} failed after {} attempts'.format(self.engine.name(), self.input_path,
self.failed_attempts))
self.status = RenderStatus.ERROR
if not self.errors:
self.errors = [self.last_output]
@@ -237,6 +218,45 @@ class BaseRenderWorker(object):
return elapsed
class BaseRenderEngine(object):
install_paths = []
supported_extensions = []
@classmethod
def name(cls):
return cls.__name__.lower()
@classmethod
def renderer_path(cls):
path = None
try:
path = subprocess.check_output(['which', cls.name()]).decode('utf-8').strip()
except subprocess.CalledProcessError:
for p in cls.install_paths:
if os.path.exists(p):
path = p
except Exception as e:
logging.exception(e)
return path
@classmethod
def version(cls):
raise NotImplementedError("version not implemented")
@classmethod
def get_help(cls):
path = cls.renderer_path()
if not path:
raise FileNotFoundError("renderer path not found")
help_doc = subprocess.check_output([path, '-h'], stderr=subprocess.STDOUT).decode('utf-8')
return help_doc
@classmethod
def get_formats(cls):
raise NotImplementedError("get_formats not implemented")
class RenderWorkerFactory:
@staticmethod
@@ -255,15 +275,14 @@ class RenderWorkerFactory:
@staticmethod
def supported_renderers():
return [x.render_engine for x in RenderWorkerFactory.supported_classes()]
return [x.engine.name() for x in RenderWorkerFactory.supported_classes()]
@staticmethod
def class_for_name(name):
name = name.lower()
for render_class in RenderWorkerFactory.supported_classes():
if render_class.render_engine == name:
if render_class.engine.name() == name:
return render_class
raise LookupError(f'Cannot find class for name: {name}')

View File

@@ -12,7 +12,7 @@ from .render_workers.render_worker import RenderStatus, RenderWorkerFactory
logger = logging.getLogger()
class RenderJob:
class ScheduledJob:
# Get file hash on bg thread
def __get_file_hash(self):

View File

@@ -16,7 +16,7 @@ import yaml
from flask import Flask, request, render_template, send_file, after_this_request, Response, redirect, url_for, abort
from werkzeug.utils import secure_filename
from lib.render_job import RenderJob
from lib.scheduled_job import ScheduledJob
from lib.render_queue import RenderQueue, JobNotFoundError
from lib.render_workers.render_worker import RenderWorkerFactory, string_to_status, RenderStatus
from lib.utilities.server_helper import post_job_to_server, generate_thumbnail_for_job
@@ -309,8 +309,8 @@ def add_job(job_params, remove_job_dir_on_failure=False):
if client == RenderQueue.host_name:
logger.info(f"Creating job locally - {name if name else input_path}")
try:
render_job = RenderJob(renderer, input_path, output_path, args, priority, job_owner, client,
notify=False, custom_id=custom_id, name=name)
render_job = ScheduledJob(renderer, input_path, output_path, args, priority, job_owner, client,
notify=False, custom_id=custom_id, name=name)
RenderQueue.add_to_render_queue(render_job, force_start=force_start)
return render_job.json()
except Exception as e:
@@ -427,11 +427,12 @@ def status():
def renderer_info():
renderer_data = {}
for r in RenderWorkerFactory.supported_renderers():
renderer_class = RenderWorkerFactory.class_for_name(r)
renderer_data[r] = {'available': renderer_class.renderer_path() is not None,
'version': renderer_class.version(),
'supported_extensions': renderer_class.supported_extensions,
'supported_export_formats': renderer_class.supported_export_formats}
engine = RenderWorkerFactory.class_for_name(r).engine
renderer_data[r] = {'available': engine.renderer_path() is not None,
'version': engine.version(),
'supported_extensions': engine.supported_extensions,
'supported_export_formats': engine.get_formats(),
'path': engine.renderer_path()}
return renderer_data