#!/usr/bin/env python3 import io import logging import os import subprocess import threading from datetime import datetime from enum import Enum import psutil logger = logging.getLogger() class RenderStatus(Enum): NOT_STARTED = "not_started" RUNNING = "running" COMPLETED = "completed" CANCELLED = "cancelled" ERROR = "error" SCHEDULED = "scheduled" def string_to_status(string): for stat in RenderStatus: if stat.value == string: return stat return RenderStatus.ERROR class BaseRenderWorker(object): renderer = 'BaseRenderWorker' render_engine = None render_engine_version = None supported_extensions = [] install_paths = [] supported_export_formats = [] def __init__(self, input_path, output_path, args=None, ignore_extensions=True): if not ignore_extensions: if not any(ext in input_path for ext in self.supported_extensions): err_meg = f'Cannot find valid project with supported file extension for {self.renderer} renderer' logger.error(err_meg) raise ValueError(err_meg) # Essential Info self.input_path = input_path self.output_path = output_path self.args = args or {} self.date_created = datetime.now() self.renderer_version = self.version() # Frame Ranges self.total_frames = 0 self.current_frame = 0 # Logging self.log_path = None self.start_time = None self.end_time = None # History self.status = RenderStatus.NOT_STARTED self.warnings = [] self.errors = [] self.failed_attempts = 0 self.maximum_attempts = 1 # Threads and processes self.__thread = threading.Thread(target=self.run, args=()) self.__thread.daemon = True self.__process = None self.is_finished = False self.last_output = None @classmethod def version(cls): raise NotImplementedError("Unknown version") @classmethod def renderer_path(cls): path = None try: path = subprocess.check_output(['which', cls.render_engine]).decode('utf-8').strip() except subprocess.CalledProcessError: for p in cls.install_paths: if os.path.exists(p): path = p except Exception as e: logging.exception(e) return path def validate(self): if not os.path.exists(self.input_path): raise FileNotFoundError(f"Cannot find input path: {self.input_path}") self.generate_subprocess() def generate_subprocess(self): # Convert raw args from string if available and catch conflicts generated_args = self.generate_worker_subprocess() generated_args_flags = [x for x in generated_args if x.startswith('-')] if len(generated_args_flags) != len(set(generated_args_flags)): msg = "Cannot generate subprocess - Multiple arg conflicts detected" logger.error(msg) logger.debug(f"Generated args for subprocess: {generated_args}") raise ValueError(msg) return generated_args def get_raw_args(self): raw_args_string = self.args.get('raw', None) raw_args = None if raw_args_string: import shlex raw_args = shlex.split(raw_args_string) return raw_args def generate_worker_subprocess(self): raise NotImplementedError("generate_worker_subprocess not implemented") def start(self): if not os.path.exists(self.input_path): self.status = RenderStatus.ERROR msg = 'Cannot find input path: {}'.format(self.input_path) logger.error(msg) self.errors.append(msg) return if not self.renderer_path(): self.status = RenderStatus.ERROR msg = 'Cannot find render engine path for {}'.format(self.render_engine) logger.error(msg) self.errors.append(msg) return self.status = RenderStatus.RUNNING logger.info('Starting {0} {1} Render for {2}'.format(self.renderer, self.version(), self.input_path)) self.__thread.start() def run(self): # Setup logging try: if not self.log_path: log_dir = os.path.join(os.path.dirname(self.input_path), 'logs') if not os.path.exists(log_dir): os.makedirs(log_dir) self.log_path = os.path.join(log_dir, os.path.basename(self.input_path)) + '.log' logger.info('Logs saved in {}'.format(self.log_path)) except Exception as e: logger.error("Error setting up logging: {}".format(e)) while self.failed_attempts < self.maximum_attempts and self.status is not RenderStatus.COMPLETED: if self.failed_attempts: logger.info('Attempt #{} failed. Starting attempt #{}'.format(self.failed_attempts, self.failed_attempts + 1)) # Start process and get updates subprocess_cmds = self.generate_subprocess() logger.debug("Renderer commands generated - {}".format(" ".join(subprocess_cmds))) self.__process = subprocess.Popen(subprocess_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=False) self.start_time = datetime.now() with open(self.log_path, "a") as f: f.write("{3} - Starting {0} {1} Render for {2}\n".format(self.renderer, self.version(), self.input_path, self.start_time.isoformat())) f.write(f"Running command: {' '.join(subprocess_cmds)}\n") for c in io.TextIOWrapper(self.__process.stdout, encoding="utf-8"): # or another encoding f.write(c) logger.debug(f"{self.renderer}Worker: {c.strip()}") self.last_output = c.strip() self._parse_stdout(c.strip()) f.write('\n') # Check return codes return_code = self.__process.wait() self.end_time = datetime.now() # Return early if job was cancelled if self.status is RenderStatus.CANCELLED: self.is_finished = True return duration = self.end_time - self.start_time if return_code: message = f"{self.renderer} render failed with return_code {return_code} after {duration}" logger.error(message) self.failed_attempts = self.failed_attempts + 1 else: message = f"{self.renderer} render completed successfully in {duration}" logger.info(message) self.status = RenderStatus.COMPLETED f.write(message) if self.failed_attempts >= self.maximum_attempts and self.status is not RenderStatus.CANCELLED: logger.error('{} Render of {} failed after {} attempts'.format(self.renderer, self.input_path, self.failed_attempts)) self.status = RenderStatus.ERROR if not self.errors: self.errors = [self.last_output] self.is_finished = True def is_running(self): if self.__thread: return self.__thread.is_alive() return False def stop(self): if self.__process: try: self.status = RenderStatus.CANCELLED self.maximum_attempts = 0 process = psutil.Process(self.__process.pid) for proc in process.children(recursive=True): proc.kill() process.kill() except Exception as e: logger.error(f"Exception stopping the process: {e}") def percent_complete(self): return 0 def _parse_stdout(self, line): raise NotImplementedError("_parse_stdout not implemented") def elapsed_time(self): elapsed = "" if self.start_time: if self.end_time: elapsed = self.end_time - self.start_time elif self.is_running(): elapsed = datetime.now() - self.start_time return elapsed class RenderWorkerFactory: @staticmethod def supported_classes(): # to add support for any additional RenderWorker classes, import their classes and add to list here from .blender_worker import BlenderRenderWorker from .aerender_worker import AERenderWorker from .ffmpeg_worker import FFMPEGRenderWorker classes = [BlenderRenderWorker, AERenderWorker, FFMPEGRenderWorker] return classes @staticmethod def create_worker(renderer, input_path, output_path, args=None): worker_class = RenderWorkerFactory.class_for_name(renderer) return worker_class(input_path=input_path, output_path=output_path, args=args) @staticmethod def supported_renderers(): return [x.render_engine for x in RenderWorkerFactory.supported_classes()] @staticmethod def class_for_name(name): name = name.lower() for render_class in RenderWorkerFactory.supported_classes(): if render_class.render_engine == name: return render_class raise LookupError(f'Cannot find class for name: {name}') def timecode_to_frames(timecode, frame_rate): e = [int(x) for x in timecode.split(':')] seconds = (((e[0] * 60) + e[1] * 60) + e[2]) frames = (seconds * frame_rate) + e[-1] + 1 return frames