More code re-organizing

This commit is contained in:
Brett Williams
2023-10-21 22:45:30 -05:00
parent 7a52cce40a
commit 9027cd7202
26 changed files with 33 additions and 48 deletions

View File

View File

@@ -0,0 +1,56 @@
import logging
import os
import subprocess
logger = logging.getLogger()
SUBPROCESS_TIMEOUT = 5
class BaseRenderEngine(object):
install_paths = []
supported_extensions = []
def __init__(self, custom_path=None):
self.custom_renderer_path = custom_path
if not self.renderer_path():
raise FileNotFoundError(f"Cannot find path to renderer for {self.name()} instance")
def renderer_path(self):
return self.custom_renderer_path or self.default_renderer_path()
@classmethod
def name(cls):
return cls.__name__.lower()
@classmethod
def default_renderer_path(cls):
path = None
try: # Linux and macOS
path = subprocess.check_output(['which', cls.name()], timeout=SUBPROCESS_TIMEOUT).decode('utf-8').strip()
except (subprocess.CalledProcessError, FileNotFoundError):
for p in cls.install_paths:
if os.path.exists(p):
path = p
except Exception as e:
logger.exception(e)
return path
def version(self):
raise NotImplementedError("version not implemented")
def get_help(self):
path = self.renderer_path()
if not path:
raise FileNotFoundError("renderer path not found")
help_doc = subprocess.check_output([path, '-h'], stderr=subprocess.STDOUT,
timeout=SUBPROCESS_TIMEOUT).decode('utf-8')
return help_doc
@classmethod
def get_output_formats(cls):
raise NotImplementedError(f"get_output_formats not implemented for {cls.__name__}")
@classmethod
def get_arguments(cls):
pass

View File

@@ -0,0 +1,336 @@
#!/usr/bin/env python3
import io
import json
import logging
import os
import subprocess
import threading
from datetime import datetime
import psutil
from pubsub import pub
from sqlalchemy import Column, Integer, String, DateTime, JSON
from sqlalchemy.ext.declarative import declarative_base
from src.utilities.misc_helper import get_time_elapsed
from src.utilities.status_utils import RenderStatus, string_to_status
logger = logging.getLogger()
Base = declarative_base()
class BaseRenderWorker(Base):
__tablename__ = 'render_workers'
id = Column(String, primary_key=True)
input_path = Column(String)
output_path = Column(String)
date_created = Column(DateTime)
start_time = Column(DateTime, nullable=True)
end_time = Column(DateTime, nullable=True)
renderer = Column(String)
renderer_version = Column(String)
renderer_path = Column(String)
priority = Column(Integer)
project_length = Column(Integer)
start_frame = Column(Integer)
end_frame = Column(Integer, nullable=True)
parent = Column(String, nullable=True)
children = Column(JSON)
name = Column(String)
file_hash = Column(String)
_status = Column(String)
engine = None
def __init__(self, input_path, output_path, engine_path, priority=2, args=None, ignore_extensions=True, parent=None,
name=None):
if not ignore_extensions:
if not any(ext in input_path for ext in self.engine.supported_extensions):
err_meg = f'Cannot find valid project with supported file extension for {self.engine.name()} renderer'
logger.error(err_meg)
raise ValueError(err_meg)
if not self.engine:
raise NotImplementedError("Engine not defined")
def generate_id():
import uuid
return str(uuid.uuid4()).split('-')[0]
# Essential Info
self.id = generate_id()
self.input_path = input_path
self.output_path = output_path
self.args = args or {}
self.date_created = datetime.now()
self.renderer = self.engine.name()
self.renderer_path = engine_path
self.renderer_version = self.engine(engine_path).version()
self.custom_renderer_path = None
self.priority = priority
self.parent = parent
self.children = {}
self.name = name or os.path.basename(input_path)
# Frame Ranges
self.project_length = -1
self.current_frame = 0 # should this be a 1 ?
self.start_frame = 0 # should this be a 1 ?
self.end_frame = None
# Logging
self.start_time = None
self.end_time = None
# History
self.status = RenderStatus.CONFIGURING
self.warnings = []
self.errors = []
# Threads and processes
self.__thread = threading.Thread(target=self.run, args=())
self.__thread.daemon = True
self.__process = None
self.last_output = None
@property
def total_frames(self):
return (self.end_frame or self.project_length) - self.start_frame + 1
@property
def status(self):
return self._status
@status.setter
def status(self, new_status):
if self._status != new_status.value:
old_status = self.status
self._status = new_status.value
pub.sendMessage('status_change', job_id=self.id, old_status=old_status, new_status=new_status)
@status.getter
def status(self):
if self._status in [RenderStatus.RUNNING.value, RenderStatus.NOT_STARTED.value]:
if not hasattr(self, 'errors'):
self._status = RenderStatus.CANCELLED.value
return string_to_status(self._status)
def validate(self):
if not os.path.exists(self.input_path):
raise FileNotFoundError(f"Cannot find input path: {self.input_path}")
self.generate_subprocess()
def generate_subprocess(self):
# Convert raw args from string if available and catch conflicts
generated_args = [str(x) for x in self.generate_worker_subprocess()]
generated_args_flags = [x for x in generated_args if x.startswith('-')]
if len(generated_args_flags) != len(set(generated_args_flags)):
msg = "Cannot generate subprocess - Multiple arg conflicts detected"
logger.error(msg)
logger.debug(f"Generated args for subprocess: {generated_args}")
raise ValueError(msg)
return generated_args
def get_raw_args(self):
raw_args_string = self.args.get('raw', None)
raw_args = None
if raw_args_string:
import shlex
raw_args = shlex.split(raw_args_string)
return raw_args
def generate_worker_subprocess(self):
raise NotImplementedError("generate_worker_subprocess not implemented")
def log_path(self):
filename = (self.name or os.path.basename(self.input_path)) + '_' + \
self.date_created.strftime("%Y.%m.%d_%H.%M.%S") + '.log'
return os.path.join(os.path.dirname(self.input_path), filename)
def start(self):
if self.status not in [RenderStatus.SCHEDULED, RenderStatus.NOT_STARTED]:
logger.error(f"Trying to start job with status: {self.status}")
return
if not os.path.exists(self.input_path):
self.status = RenderStatus.ERROR
msg = 'Cannot find input path: {}'.format(self.input_path)
logger.error(msg)
self.errors.append(msg)
return
if not os.path.exists(self.renderer_path):
self.status = RenderStatus.ERROR
msg = 'Cannot find render engine path for {}'.format(self.engine.name())
logger.error(msg)
self.errors.append(msg)
return
self.status = RenderStatus.RUNNING
self.start_time = datetime.now()
logger.info(f'Starting {self.engine.name()} {self.renderer_version} Render for {self.input_path} | '
f'Frame Count: {self.total_frames}')
self.__thread.start()
def run(self):
# Setup logging
log_dir = os.path.dirname(self.log_path())
os.makedirs(log_dir, exist_ok=True)
subprocess_cmds = self.generate_subprocess()
initial_file_count = len(self.file_list())
attempt_number = 0
with open(self.log_path(), "a") as f:
f.write(f"{self.start_time.isoformat()} - Starting {self.engine.name()} {self.renderer_version} "
f"render for {self.input_path}\n\n")
f.write(f"Running command: {subprocess_cmds}\n")
f.write('=' * 80 + '\n\n')
while True:
# Log attempt #
if attempt_number:
f.write(f'\n{"=" * 80} Attempt #{attempt_number} {"=" * 30}\n\n')
logger.warning(f"Restarting render - Attempt #{attempt_number}")
attempt_number += 1
# Start process and get updates
self.__process = subprocess.Popen(subprocess_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=False)
for c in io.TextIOWrapper(self.__process.stdout, encoding="utf-8"): # or another encoding
f.write(c)
self.last_output = c.strip()
self._parse_stdout(c.strip())
f.write('\n')
# Check return codes and process
return_code = self.__process.wait()
self.end_time = datetime.now()
if self.status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: # user cancelled
message = f"{self.engine.name()} render ended with status '{self.status}' " \
f"after {self.time_elapsed()}"
f.write(message)
return
if not return_code:
message = f"{'=' * 50}\n\n{self.engine.name()} render completed successfully in {self.time_elapsed()}"
f.write(message)
break
# Handle non-zero return codes
message = f"{'=' * 50}\n\n{self.engine.name()} render failed with code {return_code} " \
f"after {self.time_elapsed()}"
f.write(message)
self.errors.append(message)
# if file output hasn't increased, return as error, otherwise restart process.
if len(self.file_list()) <= initial_file_count:
self.status = RenderStatus.ERROR
return
if self.children:
from src.distributed_job_manager import DistributedJobManager
DistributedJobManager.wait_for_subjobs(local_job=self)
# Post Render Work
logger.debug("Starting post-processing work")
self.post_processing()
self.status = RenderStatus.COMPLETED
logger.info(f"Render {self.id}-{self.name} completed successfully after {self.time_elapsed()}")
def post_processing(self):
pass
def is_running(self):
if self.__thread:
return self.__thread.is_alive()
return False
def log_error(self, error_line, halt_render=False):
logger.error(error_line)
self.errors.append(error_line)
if halt_render:
self.stop(is_error=True)
def stop(self, is_error=False):
if hasattr(self, '__process'):
try:
process = psutil.Process(self.__process.pid)
for proc in process.children(recursive=True):
proc.kill()
process.kill()
except Exception as e:
logger.debug(f"Error stopping the process: {e}")
if self.status in [RenderStatus.RUNNING, RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED]:
if is_error:
err_message = self.errors[-1] if self.errors else 'Unknown error'
logger.error(f"Halting render due to error: {err_message}")
self.status = RenderStatus.ERROR
else:
self.status = RenderStatus.CANCELLED
def percent_complete(self):
return 0
def _parse_stdout(self, line):
raise NotImplementedError("_parse_stdout not implemented")
def time_elapsed(self):
return get_time_elapsed(self.start_time, self.end_time)
def file_list(self):
try:
job_dir = os.path.dirname(self.output_path)
file_list = [os.path.join(job_dir, file) for file in os.listdir(job_dir)]
file_list.sort()
return file_list
except FileNotFoundError:
return []
def json(self):
job_dict = {
'id': self.id,
'name': self.name,
'input_path': self.input_path,
'output_path': self.output_path,
'priority': self.priority,
'parent': self.parent,
'children': self.children,
'date_created': self.date_created,
'start_time': self.start_time,
'end_time': self.end_time,
'status': self.status.value,
'file_hash': self.file_hash,
'percent_complete': self.percent_complete(),
'file_count': len(self.file_list()),
'renderer': self.renderer,
'renderer_version': self.renderer_version,
'errors': getattr(self, 'errors', None),
'start_frame': self.start_frame,
'end_frame': self.end_frame,
'total_frames': self.total_frames,
'last_output': getattr(self, 'last_output', None),
'log_path': self.log_path()
}
# convert to json and back to auto-convert dates to iso format
def date_serializer(o):
if isinstance(o, datetime):
return o.isoformat()
json_convert = json.dumps(job_dict, default=date_serializer)
worker_json = json.loads(json_convert)
return worker_json
def timecode_to_frames(timecode, frame_rate):
e = [int(x) for x in timecode.split(':')]
seconds = (((e[0] * 60) + e[1] * 60) + e[2])
frames = (seconds * frame_rate) + e[-1] + 1
return frames

View File

@@ -0,0 +1,138 @@
import logging
import os
import shutil
import tarfile
import tempfile
import zipfile
import requests
from tqdm import tqdm
supported_formats = ['.zip', '.tar.xz', '.dmg']
logger = logging.getLogger()
def download_and_extract_app(remote_url, download_location):
# Create a temp download directory
temp_download_dir = tempfile.mkdtemp()
temp_downloaded_file_path = os.path.join(temp_download_dir, os.path.basename(remote_url))
try:
output_dir_name = os.path.basename(remote_url)
for fmt in supported_formats:
output_dir_name = output_dir_name.split(fmt)[0]
if os.path.exists(os.path.join(download_location, output_dir_name)):
logger.error(f"Engine download for {output_dir_name} already exists")
return
if not os.path.exists(temp_downloaded_file_path):
# Make a GET request to the URL with stream=True to enable streaming
logger.info(f"Downloading {output_dir_name} from {remote_url}")
response = requests.get(remote_url, stream=True)
# Check if the request was successful
if response.status_code == 200:
# Get the total file size from the "Content-Length" header
file_size = int(response.headers.get("Content-Length", 0))
# Create a progress bar using tqdm
progress_bar = tqdm(total=file_size, unit="B", unit_scale=True)
# Open a file for writing in binary mode
with open(temp_downloaded_file_path, "wb") as file:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
# Write the chunk to the file
file.write(chunk)
# Update the progress bar
progress_bar.update(len(chunk))
# Close the progress bar
progress_bar.close()
logger.info(f"Successfully downloaded {os.path.basename(temp_downloaded_file_path)}")
else:
logger.error(f"Failed to download the file. Status code: {response.status_code}")
os.makedirs(download_location, exist_ok=True)
# Extract the downloaded file
# Process .tar.xz files
if temp_downloaded_file_path.lower().endswith('.tar.xz'):
try:
with tarfile.open(temp_downloaded_file_path, 'r:xz') as tar:
tar.extractall(path=download_location)
logger.info(
f'Successfully extracted {os.path.basename(temp_downloaded_file_path)} to {download_location}')
except tarfile.TarError as e:
logger.error(f'Error extracting {temp_downloaded_file_path}: {e}')
except FileNotFoundError:
logger.error(f'File not found: {temp_downloaded_file_path}')
# Process .zip files
elif temp_downloaded_file_path.lower().endswith('.zip'):
try:
with zipfile.ZipFile(temp_downloaded_file_path, 'r') as zip_ref:
zip_ref.extractall(download_location)
logger.info(
f'Successfully extracted {os.path.basename(temp_downloaded_file_path)} to {download_location}')
except zipfile.BadZipFile as e:
logger.error(f'Error: {temp_downloaded_file_path} is not a valid ZIP file.')
except FileNotFoundError:
logger.error(f'File not found: {temp_downloaded_file_path}')
# Process .dmg files (macOS only)
elif temp_downloaded_file_path.lower().endswith('.dmg'):
import dmglib
dmg = dmglib.DiskImage(temp_downloaded_file_path)
for mount_point in dmg.attach():
try:
copy_directory_contents(mount_point, os.path.join(download_location, output_dir_name))
logger.info(f'Successfully copied {os.path.basename(temp_downloaded_file_path)} to {download_location}')
except FileNotFoundError:
logger.error(f'Error: The source .app bundle does not exist.')
except PermissionError:
logger.error(f'Error: Permission denied to copy {download_location}.')
except Exception as e:
logger.error(f'An error occurred: {e}')
dmg.detach()
else:
logger.error("Unknown file. Unable to extract binary.")
except Exception as e:
logger.exception(e)
# remove downloaded file on completion
shutil.rmtree(temp_download_dir)
return download_location
# Function to copy directory contents but ignore symbolic links and hidden files
def copy_directory_contents(src_dir, dest_dir):
try:
# Create the destination directory if it doesn't exist
os.makedirs(dest_dir, exist_ok=True)
for item in os.listdir(src_dir):
item_path = os.path.join(src_dir, item)
# Ignore symbolic links
if os.path.islink(item_path):
continue
# Ignore hidden files or directories (those starting with a dot)
if not item.startswith('.'):
dest_item_path = os.path.join(dest_dir, item)
# If it's a directory, recursively copy its contents
if os.path.isdir(item_path):
copy_directory_contents(item_path, dest_item_path)
else:
# Otherwise, copy the file
shutil.copy2(item_path, dest_item_path)
except Exception as e:
logger.exception(f"Error copying directory contents: {e}")

View File

@@ -0,0 +1,50 @@
import logging
from src.engines.engine_manager import EngineManager
logger = logging.getLogger()
class RenderWorkerFactory:
@staticmethod
def supported_classes():
# to add support for any additional RenderWorker classes, import their classes and add to list here
from src.engines.blender.blender_worker import BlenderRenderWorker
from src.engines.aerender.aerender_worker import AERenderWorker
from src.engines.ffmpeg.ffmpeg_worker import FFMPEGRenderWorker
classes = [BlenderRenderWorker, AERenderWorker, FFMPEGRenderWorker]
return classes
@staticmethod
def create_worker(renderer, input_path, output_path, engine_version=None, args=None, parent=None, name=None):
worker_class = RenderWorkerFactory.class_for_name(renderer)
# find correct engine version
all_versions = EngineManager.all_versions_for_engine(renderer)
if not all_versions:
raise FileNotFoundError(f"Cannot find any installed {renderer} engines")
engine_path = all_versions[0]['path']
if engine_version:
for ver in all_versions:
if ver['version'] == engine_version:
engine_path = ver['path']
break
if not engine_path:
logger.warning(f"Cannot find requested engine version {engine_version}. Using default version {all_versions[0]['version']}")
return worker_class(input_path=input_path, output_path=output_path, engine_path=engine_path, args=args,
parent=parent, name=name)
@staticmethod
def supported_renderers():
return [x.engine.name() for x in RenderWorkerFactory.supported_classes()]
@staticmethod
def class_for_name(name):
name = name.lower()
for render_class in RenderWorkerFactory.supported_classes():
if render_class.engine.name() == name:
return render_class
raise LookupError(f'Cannot find class for name: {name}')