#!/usr/bin/env python3 import io import logging import os import subprocess import threading import json import glob from datetime import datetime from enum import Enum from sqlalchemy import Column, Integer, String, DateTime from sqlalchemy.ext.declarative import declarative_base from lib.utilities.misc_helper import get_time_elapsed import psutil logger = logging.getLogger() Base = declarative_base() class RenderStatus(Enum): NOT_STARTED = "not_started" RUNNING = "running" COMPLETED = "completed" CANCELLED = "cancelled" ERROR = "error" SCHEDULED = "scheduled" UNDEFINED = "undefined" def string_to_status(string): for stat in RenderStatus: if stat.value == string: return stat return RenderStatus.UNDEFINED class BaseRenderWorker(Base): __tablename__ = 'render_workers' id = Column(String, primary_key=True) input_path = Column(String) output_path = Column(String) date_created = Column(DateTime) start_time = Column(DateTime, nullable=True) end_time = Column(DateTime, nullable=True) renderer = Column(String) renderer_version = Column(String) priority = Column(Integer) project_length = Column(Integer) start_frame = Column(Integer) end_frame = Column(Integer, nullable=True) owner = Column(String) client = Column(String) name = Column(String) file_hash = Column(String) _status = Column(String) engine = None def __init__(self, input_path, output_path, priority=2, args=None, ignore_extensions=True, owner=None, client=None, name=None): if not ignore_extensions: if not any(ext in input_path for ext in self.engine.supported_extensions): err_meg = f'Cannot find valid project with supported file extension for {self.engine.name()} renderer' logger.error(err_meg) raise ValueError(err_meg) if not self.engine: raise NotImplementedError("Engine not defined") def generate_id(): import uuid return str(uuid.uuid4()).split('-')[0] # Essential Info self.id = generate_id() self.input_path = input_path self.output_path = output_path self.args = args or {} self.date_created = datetime.now() self.renderer = self.engine.name() self.renderer_version = self.engine.version() self.priority = priority self.owner = owner self.client = client self.name = name # Frame Ranges self.project_length = -1 self.current_frame = 0 # should this be a 1 ? self.start_frame = 0 # should this be a 1 ? self.end_frame = None # Logging self.start_time = None self.end_time = None # History self.status = RenderStatus.NOT_STARTED self.warnings = [] self.errors = [] self.failed_attempts = 0 self.maximum_attempts = 1 # Threads and processes self.__thread = threading.Thread(target=self.run, args=()) self.__thread.daemon = True self.__process = None self.is_finished = False self.last_output = None @property def total_frames(self): return (self.end_frame or self.project_length) - self.start_frame + 1 @property def status(self): return self._status @status.setter def status(self, value): self._status = value.value @status.getter def status(self): if self._status in [RenderStatus.RUNNING.value, RenderStatus.NOT_STARTED.value]: if not hasattr(self, 'errors'): self._status = RenderStatus.CANCELLED return RenderStatus.CANCELLED return string_to_status(self._status) def validate(self): if not os.path.exists(self.input_path): raise FileNotFoundError(f"Cannot find input path: {self.input_path}") self.generate_subprocess() def generate_subprocess(self): # Convert raw args from string if available and catch conflicts generated_args = [str(x) for x in self.generate_worker_subprocess()] generated_args_flags = [x for x in generated_args if x.startswith('-')] if len(generated_args_flags) != len(set(generated_args_flags)): msg = "Cannot generate subprocess - Multiple arg conflicts detected" logger.error(msg) logger.debug(f"Generated args for subprocess: {generated_args}") raise ValueError(msg) return generated_args def get_raw_args(self): raw_args_string = self.args.get('raw', None) raw_args = None if raw_args_string: import shlex raw_args = shlex.split(raw_args_string) return raw_args def generate_worker_subprocess(self): raise NotImplementedError("generate_worker_subprocess not implemented") def log_path(self): filename = (self.name or os.path.basename(self.input_path)) + '_' + \ self.date_created.strftime("%Y.%m.%d_%H.%M.%S") + '.log' return os.path.join(os.path.dirname(self.input_path), filename) def start(self): if self.status not in [RenderStatus.SCHEDULED, RenderStatus.NOT_STARTED]: logger.error(f"Trying to start job with status: {self.status}") return if not os.path.exists(self.input_path): self.status = RenderStatus.ERROR msg = 'Cannot find input path: {}'.format(self.input_path) logger.error(msg) self.errors.append(msg) return if not self.engine.renderer_path(): self.status = RenderStatus.ERROR msg = 'Cannot find render engine path for {}'.format(self.engine.name()) logger.error(msg) self.errors.append(msg) return self.status = RenderStatus.RUNNING logger.info(f'Starting {self.engine.name()} {self.engine.version()} Render for {self.input_path} | Frame Count: {self.total_frames}') self.__thread.start() def run(self): # Setup logging log_dir = os.path.dirname(self.log_path()) os.makedirs(log_dir, exist_ok=True) while self.failed_attempts < self.maximum_attempts and self.status is not RenderStatus.COMPLETED: if self.failed_attempts: logger.info(f'Attempt #{self.failed_attempts} failed. Starting attempt #{self.failed_attempts + 1}') # Start process and get updates subprocess_cmds = self.generate_subprocess() logger.debug("Renderer commands generated - {}".format(" ".join(subprocess_cmds))) self.__process = subprocess.Popen(subprocess_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=False) self.start_time = datetime.now() with open(self.log_path(), "a") as f: f.write(f"{self.start_time.isoformat()} - Starting {self.engine.name()} {self.engine.version()} " f"Render for {self.input_path}") f.write(f"Running command: {' '.join(subprocess_cmds)}\n") for c in io.TextIOWrapper(self.__process.stdout, encoding="utf-8"): # or another encoding f.write(c) logger.debug(f"{self.engine.name()}Worker: {c.strip()}") self.last_output = c.strip() self._parse_stdout(c.strip()) f.write('\n') # Check return codes return_code = self.__process.wait() self.end_time = datetime.now() # Return early if job was cancelled if self.status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: self.is_finished = True return if return_code: message = f"{self.engine.name()} render failed with return_code {return_code} after {self.time_elapsed()}" logger.error(message) self.failed_attempts = self.failed_attempts + 1 else: message = f"{self.engine.name()} render completed successfully in {self.time_elapsed()}" logger.info(message) self.status = RenderStatus.COMPLETED f.write(message) if self.failed_attempts >= self.maximum_attempts and self.status is not RenderStatus.CANCELLED: logger.error('{} Render of {} failed after {} attempts'.format(self.engine.name(), self.input_path, self.failed_attempts)) self.status = RenderStatus.ERROR if not self.errors: self.errors = [self.last_output] self.is_finished = True self.post_processing() def post_processing(self): pass def is_running(self): if self.__thread: return self.__thread.is_alive() return False def log_error(self, error_line, halt_render=False): logger.error(error_line) self.errors.append(error_line) if halt_render: self.stop(is_error=True) def stop(self, is_error=False): if hasattr(self, '__process'): try: self.maximum_attempts = 0 process = psutil.Process(self.__process.pid) for proc in process.children(recursive=True): proc.kill() process.kill() except Exception as e: logger.debug(f"Error stopping the process: {e}") if self.status in [RenderStatus.RUNNING, RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED]: if is_error: err_message = self.errors[-1] if self.errors else 'Unknown error' logger.error(f"Halting render due to error: {err_message}") self.status = RenderStatus.ERROR else: self.status = RenderStatus.CANCELLED def percent_complete(self): return 0 def _parse_stdout(self, line): raise NotImplementedError("_parse_stdout not implemented") def time_elapsed(self): return get_time_elapsed(self.start_time, self.end_time) def file_list(self): job_dir = os.path.dirname(self.output_path) file_list = glob.glob(os.path.join(job_dir, '*')) file_list.sort() return file_list def json(self): job_dict = { 'id': self.id, 'name': self.name, 'input_path': self.input_path, 'output_path': self.output_path, 'priority': self.priority, 'owner': self.owner, 'client': self.client, 'date_created': self.date_created, 'start_time': self.start_time, 'end_time': self.end_time, 'status': self.status.value, 'file_hash': self.file_hash, 'percent_complete': self.percent_complete(), 'file_count': len(self.file_list()), 'renderer': self.renderer, 'renderer_version': self.renderer_version, 'errors': getattr(self, 'errors', None), 'total_frames': self.total_frames, 'last_output': getattr(self, 'last_output', None), 'log_path': self.log_path() } # convert to json and back to auto-convert dates to iso format def date_serializer(o): if isinstance(o, datetime): return o.isoformat() json_convert = json.dumps(job_dict, default=date_serializer) worker_json = json.loads(json_convert) return worker_json def timecode_to_frames(timecode, frame_rate): e = [int(x) for x in timecode.split(':')] seconds = (((e[0] * 60) + e[1] * 60) + e[2]) frames = (seconds * frame_rate) + e[-1] + 1 return frames