#!/usr/bin/env python3 import io import logging import os import subprocess import threading import json from datetime import datetime from enum import Enum import psutil logger = logging.getLogger() class RenderStatus(Enum): NOT_STARTED = "not_started" RUNNING = "running" COMPLETED = "completed" CANCELLED = "cancelled" ERROR = "error" SCHEDULED = "scheduled" UNDEFINED = "undefined" def string_to_status(string): for stat in RenderStatus: if stat.value == string: return stat return RenderStatus.ERROR class BaseRenderWorker(object): engine = None def __init__(self, input_path, output_path, args=None, ignore_extensions=True): if not ignore_extensions: if not any(ext in input_path for ext in self.engine.supported_extensions): err_meg = f'Cannot find valid project with supported file extension for {self.engine.name()} renderer' logger.error(err_meg) raise ValueError(err_meg) if not self.engine: raise NotImplementedError("Engine not defined") # Essential Info self.input_path = input_path self.output_path = output_path self.args = args or {} self.date_created = datetime.now() self.renderer_version = self.engine.version() # Frame Ranges self.total_frames = 0 self.current_frame = 0 # Logging self.log_path = None self.start_time = None self.end_time = None # History self.status = RenderStatus.NOT_STARTED self.warnings = [] self.errors = [] self.halt_on_errors = True self.failed_attempts = 0 self.maximum_attempts = 1 # Threads and processes self.__thread = threading.Thread(target=self.run, args=()) self.__thread.daemon = True self.__process = None self.is_finished = False self.last_output = None def validate(self): if not os.path.exists(self.input_path): raise FileNotFoundError(f"Cannot find input path: {self.input_path}") self.generate_subprocess() def generate_subprocess(self): # Convert raw args from string if available and catch conflicts generated_args = self.generate_worker_subprocess() generated_args_flags = [x for x in generated_args if x.startswith('-')] if len(generated_args_flags) != len(set(generated_args_flags)): msg = "Cannot generate subprocess - Multiple arg conflicts detected" logger.error(msg) logger.debug(f"Generated args for subprocess: {generated_args}") raise ValueError(msg) return generated_args def get_raw_args(self): raw_args_string = self.args.get('raw', None) raw_args = None if raw_args_string: import shlex raw_args = shlex.split(raw_args_string) return raw_args def generate_worker_subprocess(self): raise NotImplementedError("generate_worker_subprocess not implemented") def start(self): if not os.path.exists(self.input_path): self.status = RenderStatus.ERROR msg = 'Cannot find input path: {}'.format(self.input_path) logger.error(msg) self.errors.append(msg) return if not self.engine.renderer_path(): self.status = RenderStatus.ERROR msg = 'Cannot find render engine path for {}'.format(self.engine.name()) logger.error(msg) self.errors.append(msg) return self.status = RenderStatus.RUNNING logger.info(f'Starting {self.engine.name()} {self.engine.version()} Render for {self.input_path}') self.__thread.start() def run(self): # Setup logging try: if not self.log_path: log_dir = os.path.join(os.path.dirname(self.input_path), 'logs') if not os.path.exists(log_dir): os.makedirs(log_dir) self.log_path = os.path.join(log_dir, os.path.basename(self.input_path)) + '.log' logger.info('Logs saved in {}'.format(self.log_path)) except Exception as e: logger.error("Error setting up logging: {}".format(e)) while self.failed_attempts < self.maximum_attempts and self.status is not RenderStatus.COMPLETED: if self.failed_attempts: logger.info(f'Attempt #{self.failed_attempts} failed. Starting attempt #{self.failed_attempts + 1}') # Start process and get updates subprocess_cmds = self.generate_subprocess() logger.debug("Renderer commands generated - {}".format(" ".join(subprocess_cmds))) self.__process = subprocess.Popen(subprocess_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=False) self.start_time = datetime.now() with open(self.log_path, "a") as f: f.write(f"{self.start_time.isoformat()} - Starting {self.engine.name()} {self.engine.version()} " f"Render for {self.input_path}") f.write(f"Running command: {' '.join(subprocess_cmds)}\n") for c in io.TextIOWrapper(self.__process.stdout, encoding="utf-8"): # or another encoding f.write(c) logger.debug(f"{self.engine.name()}Worker: {c.strip()}") self.last_output = c.strip() self._parse_stdout(c.strip()) f.write('\n') # Check return codes return_code = self.__process.wait() self.end_time = datetime.now() # Return early if job was cancelled if self.status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: self.is_finished = True return duration = self.end_time - self.start_time if return_code: message = f"{self.engine.name()} render failed with return_code {return_code} after {duration}" logger.error(message) self.failed_attempts = self.failed_attempts + 1 else: message = f"{self.engine.name()} render completed successfully in {duration}" logger.info(message) self.status = RenderStatus.COMPLETED f.write(message) if self.failed_attempts >= self.maximum_attempts and self.status is not RenderStatus.CANCELLED: logger.error('{} Render of {} failed after {} attempts'.format(self.engine.name(), self.input_path, self.failed_attempts)) self.status = RenderStatus.ERROR if not self.errors: self.errors = [self.last_output] self.is_finished = True def is_running(self): if self.__thread: return self.__thread.is_alive() return False def log_error(self, error_line): logger.error(error_line) self.errors.append(error_line) if self.halt_on_errors: self.stop(is_error=True) def stop(self, is_error=False): if self.__process: try: if self.status in [RenderStatus.RUNNING, RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED]: if is_error: err_message = self.errors[-1] if self.errors else 'Unknown error' logger.error(f"Halting render due to error: {err_message}") self.status = RenderStatus.ERROR else: self.status = RenderStatus.CANCELLED self.maximum_attempts = 0 process = psutil.Process(self.__process.pid) for proc in process.children(recursive=True): proc.kill() process.kill() except Exception as e: logger.debug(f"Error stopping the process: {e}") def percent_complete(self): return 0 def _parse_stdout(self, line): raise NotImplementedError("_parse_stdout not implemented") def time_elapsed(self): from string import Template class DeltaTemplate(Template): delimiter = "%" def strfdelta(tdelta, fmt='%H:%M:%S'): d = {"D": tdelta.days} hours, rem = divmod(tdelta.seconds, 3600) minutes, seconds = divmod(rem, 60) d["H"] = '{:02d}'.format(hours) d["M"] = '{:02d}'.format(minutes) d["S"] = '{:02d}'.format(seconds) t = DeltaTemplate(fmt) return t.substitute(**d) # calculate elapsed time elapsed_time = None start_time = self.start_time end_time = self.end_time if start_time: if end_time: elapsed_time = end_time - start_time elif self.status == RenderStatus.RUNNING: elapsed_time = datetime.now() - start_time elapsed_time_string = strfdelta(elapsed_time) if elapsed_time else "Unknown" return elapsed_time_string def json(self): worker_data = self.__dict__.copy() worker_data['percent_complete'] = self.percent_complete() worker_data['time_elapsed'] = self.time_elapsed() worker_data['status'] = self.status.value keys_to_remove = ['thread', 'process'] # remove unwanted keys from dict for key in worker_data.keys(): if key.startswith('_'): keys_to_remove.append(key) for key in keys_to_remove: worker_data.pop(key, None) # convert to json and back to auto-convert dates to iso format def date_serializer(o): if isinstance(o, datetime): return o.isoformat() json_convert = json.dumps(worker_data, default=date_serializer) worker_json = json.loads(json_convert) return worker_json def timecode_to_frames(timecode, frame_rate): e = [int(x) for x in timecode.split(':')] seconds = (((e[0] * 60) + e[1] * 60) + e[2]) frames = (seconds * frame_rate) + e[-1] + 1 return frames