#!/usr/bin/env python3 import io import logging import os import subprocess import threading import json from datetime import datetime from enum import Enum from sqlalchemy import Column, Integer, String, DateTime, JSON, event from sqlalchemy.ext.declarative import declarative_base import psutil logger = logging.getLogger() Base = declarative_base() class RenderStatus(Enum): NOT_STARTED = "not_started" RUNNING = "running" COMPLETED = "completed" CANCELLED = "cancelled" ERROR = "error" SCHEDULED = "scheduled" UNDEFINED = "undefined" def string_to_status(string): for stat in RenderStatus: if stat.value == string: return stat return RenderStatus.ERROR class BaseRenderWorker(Base): __tablename__ = 'render_workers' id = Column(Integer, primary_key=True) input_path = Column(String) output_path = Column(String) date_created = Column(DateTime) renderer = Column(String) renderer_version = Column(String) priority = Column(Integer) owner = Column(String) client = Column(String) name = Column(String) file_hash = Column(String) engine = None def __init__(self, input_path, output_path, priority=2, args=None, ignore_extensions=True, owner=None, client=None, name=None): if not ignore_extensions: if not any(ext in input_path for ext in self.engine.supported_extensions): err_meg = f'Cannot find valid project with supported file extension for {self.engine.name()} renderer' logger.error(err_meg) raise ValueError(err_meg) if not self.engine: raise NotImplementedError("Engine not defined") # Essential Info self.input_path = input_path self.output_path = output_path self.args = args or {} self.date_created = datetime.now() self.renderer = self.engine.name() self.renderer_version = self.engine.version() self.priority = priority self.owner = owner self.client = client self.name = name # Frame Ranges self.total_frames = 0 self.current_frame = 0 # Logging self.log_path = None self.start_time = None self.end_time = None # History self.status = RenderStatus.NOT_STARTED self.warnings = [] self.errors = [] self.failed_attempts = 0 self.maximum_attempts = 1 # Threads and processes self.__thread = threading.Thread(target=self.run, args=()) self.__thread.daemon = True self.__process = None self.is_finished = False self.last_output = None def validate(self): if not os.path.exists(self.input_path): raise FileNotFoundError(f"Cannot find input path: {self.input_path}") self.generate_subprocess() def generate_subprocess(self): # Convert raw args from string if available and catch conflicts generated_args = self.generate_worker_subprocess() generated_args_flags = [x for x in generated_args if x.startswith('-')] if len(generated_args_flags) != len(set(generated_args_flags)): msg = "Cannot generate subprocess - Multiple arg conflicts detected" logger.error(msg) logger.debug(f"Generated args for subprocess: {generated_args}") raise ValueError(msg) return generated_args def get_raw_args(self): raw_args_string = self.args.get('raw', None) raw_args = None if raw_args_string: import shlex raw_args = shlex.split(raw_args_string) return raw_args def generate_worker_subprocess(self): raise NotImplementedError("generate_worker_subprocess not implemented") def start(self): if not os.path.exists(self.input_path): self.status = RenderStatus.ERROR msg = 'Cannot find input path: {}'.format(self.input_path) logger.error(msg) self.errors.append(msg) return if not self.engine.renderer_path(): self.status = RenderStatus.ERROR msg = 'Cannot find render engine path for {}'.format(self.engine.name()) logger.error(msg) self.errors.append(msg) return self.status = RenderStatus.RUNNING logger.info(f'Starting {self.engine.name()} {self.engine.version()} Render for {self.input_path}') self.__thread.start() def run(self): # Setup logging try: if not self.log_path: log_dir = os.path.join(os.path.dirname(self.input_path), 'logs') if not os.path.exists(log_dir): os.makedirs(log_dir) self.log_path = os.path.join(log_dir, os.path.basename(self.input_path)) + '.log' logger.info('Logs saved in {}'.format(self.log_path)) except Exception as e: logger.error("Error setting up logging: {}".format(e)) while self.failed_attempts < self.maximum_attempts and self.status is not RenderStatus.COMPLETED: if self.failed_attempts: logger.info(f'Attempt #{self.failed_attempts} failed. Starting attempt #{self.failed_attempts + 1}') # Start process and get updates subprocess_cmds = self.generate_subprocess() logger.debug("Renderer commands generated - {}".format(" ".join(subprocess_cmds))) self.__process = subprocess.Popen(subprocess_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=False) self.start_time = datetime.now() with open(self.log_path, "a") as f: f.write(f"{self.start_time.isoformat()} - Starting {self.engine.name()} {self.engine.version()} " f"Render for {self.input_path}") f.write(f"Running command: {' '.join(subprocess_cmds)}\n") for c in io.TextIOWrapper(self.__process.stdout, encoding="utf-8"): # or another encoding f.write(c) logger.debug(f"{self.engine.name()}Worker: {c.strip()}") self.last_output = c.strip() self._parse_stdout(c.strip()) f.write('\n') # Check return codes return_code = self.__process.wait() self.end_time = datetime.now() # Return early if job was cancelled if self.status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: self.is_finished = True return duration = self.end_time - self.start_time if return_code: message = f"{self.engine.name()} render failed with return_code {return_code} after {duration}" logger.error(message) self.failed_attempts = self.failed_attempts + 1 else: message = f"{self.engine.name()} render completed successfully in {duration}" logger.info(message) self.status = RenderStatus.COMPLETED f.write(message) if self.failed_attempts >= self.maximum_attempts and self.status is not RenderStatus.CANCELLED: logger.error('{} Render of {} failed after {} attempts'.format(self.engine.name(), self.input_path, self.failed_attempts)) self.status = RenderStatus.ERROR if not self.errors: self.errors = [self.last_output] self.is_finished = True self.post_processing() def post_processing(self): pass def is_running(self): if self.__thread: return self.__thread.is_alive() return False def log_error(self, error_line, halt_render=False): logger.error(error_line) self.errors.append(error_line) if halt_render: self.stop(is_error=True) def stop(self, is_error=False): if self.__process: try: if self.status in [RenderStatus.RUNNING, RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED]: if is_error: err_message = self.errors[-1] if self.errors else 'Unknown error' logger.error(f"Halting render due to error: {err_message}") self.status = RenderStatus.ERROR else: self.status = RenderStatus.CANCELLED self.maximum_attempts = 0 process = psutil.Process(self.__process.pid) for proc in process.children(recursive=True): proc.kill() process.kill() except Exception as e: logger.debug(f"Error stopping the process: {e}") def percent_complete(self): return 0 def _parse_stdout(self, line): raise NotImplementedError("_parse_stdout not implemented") def time_elapsed(self): from string import Template class DeltaTemplate(Template): delimiter = "%" def strfdelta(tdelta, fmt='%H:%M:%S'): d = {"D": tdelta.days} hours, rem = divmod(tdelta.seconds, 3600) minutes, seconds = divmod(rem, 60) d["H"] = '{:02d}'.format(hours) d["M"] = '{:02d}'.format(minutes) d["S"] = '{:02d}'.format(seconds) t = DeltaTemplate(fmt) return t.substitute(**d) # calculate elapsed time elapsed_time = None start_time = self.start_time end_time = self.end_time if start_time: if end_time: elapsed_time = end_time - start_time elif self.status == RenderStatus.RUNNING: elapsed_time = datetime.now() - start_time elapsed_time_string = strfdelta(elapsed_time) if elapsed_time else "Unknown" return elapsed_time_string def json(self): worker_data = self.__dict__.copy() worker_data['percent_complete'] = self.percent_complete() worker_data['time_elapsed'] = self.time_elapsed() worker_data['status'] = self.status.value keys_to_remove = ['thread', 'process'] # remove unwanted keys from dict for key in worker_data.keys(): if key.startswith('_'): keys_to_remove.append(key) for key in keys_to_remove: worker_data.pop(key, None) # convert to json and back to auto-convert dates to iso format def date_serializer(o): if isinstance(o, datetime): return o.isoformat() json_convert = json.dumps(worker_data, default=date_serializer) worker_json = json.loads(json_convert) return worker_json def timecode_to_frames(timecode, frame_rate): e = [int(x) for x in timecode.split(':')] seconds = (((e[0] * 60) + e[1] * 60) + e[2]) frames = (seconds * frame_rate) + e[-1] + 1 return frames