#!/usr/bin/env python3 import io import json import logging import os import subprocess import threading import time import zipfile from datetime import datetime from enum import Enum import psutil from sqlalchemy import Column, Integer, String, DateTime, JSON from sqlalchemy.ext.declarative import declarative_base from lib.utilities.misc_helper import get_time_elapsed logger = logging.getLogger() Base = declarative_base() class RenderStatus(Enum): NOT_STARTED = "not_started" RUNNING = "running" COMPLETED = "completed" CANCELLED = "cancelled" ERROR = "error" SCHEDULED = "scheduled" WAITING = "waiting" NOT_READY = "not_ready" UNDEFINED = "undefined" def string_to_status(string): for stat in RenderStatus: if stat.value == string: return stat return RenderStatus.UNDEFINED class BaseRenderWorker(Base): __tablename__ = 'render_workers' id = Column(String, primary_key=True) input_path = Column(String) output_path = Column(String) date_created = Column(DateTime) start_time = Column(DateTime, nullable=True) end_time = Column(DateTime, nullable=True) renderer = Column(String) renderer_version = Column(String) priority = Column(Integer) project_length = Column(Integer) start_frame = Column(Integer) end_frame = Column(Integer, nullable=True) parent = Column(String, nullable=True) children = Column(JSON) name = Column(String) file_hash = Column(String) _status = Column(String) engine = None def __init__(self, input_path, output_path, priority=2, args=None, ignore_extensions=True, parent=None, name=None): if not ignore_extensions: if not any(ext in input_path for ext in self.engine.supported_extensions): err_meg = f'Cannot find valid project with supported file extension for {self.engine.name()} renderer' logger.error(err_meg) raise ValueError(err_meg) if not self.engine: raise NotImplementedError("Engine not defined") def generate_id(): import uuid return str(uuid.uuid4()).split('-')[0] # Essential Info self.id = generate_id() self.input_path = input_path self.output_path = output_path self.args = args or {} self.date_created = datetime.now() self.renderer = self.engine.name() self.renderer_version = self.engine.version() self.priority = priority self.parent = parent self.children = {} self.name = name or os.path.basename(input_path) # Frame Ranges self.project_length = -1 self.current_frame = 0 # should this be a 1 ? self.start_frame = 0 # should this be a 1 ? self.end_frame = None # Logging self.start_time = None self.end_time = None # History self.status = RenderStatus.NOT_READY self.warnings = [] self.errors = [] # Threads and processes self.__thread = threading.Thread(target=self.run, args=()) self.__thread.daemon = True self.__process = None self.last_output = None @property def total_frames(self): return (self.end_frame or self.project_length) - self.start_frame + 1 @property def status(self): return self._status @status.setter def status(self, value): self._status = value.value @status.getter def status(self): if self._status in [RenderStatus.RUNNING.value, RenderStatus.NOT_STARTED.value]: if not hasattr(self, 'errors'): self._status = RenderStatus.CANCELLED.value return string_to_status(self._status) def validate(self): if not os.path.exists(self.input_path): raise FileNotFoundError(f"Cannot find input path: {self.input_path}") self.generate_subprocess() def generate_subprocess(self): # Convert raw args from string if available and catch conflicts generated_args = [str(x) for x in self.generate_worker_subprocess()] generated_args_flags = [x for x in generated_args if x.startswith('-')] if len(generated_args_flags) != len(set(generated_args_flags)): msg = "Cannot generate subprocess - Multiple arg conflicts detected" logger.error(msg) logger.debug(f"Generated args for subprocess: {generated_args}") raise ValueError(msg) return generated_args def get_raw_args(self): raw_args_string = self.args.get('raw', None) raw_args = None if raw_args_string: import shlex raw_args = shlex.split(raw_args_string) return raw_args def generate_worker_subprocess(self): raise NotImplementedError("generate_worker_subprocess not implemented") def log_path(self): filename = (self.name or os.path.basename(self.input_path)) + '_' + \ self.date_created.strftime("%Y.%m.%d_%H.%M.%S") + '.log' return os.path.join(os.path.dirname(self.input_path), filename) def start(self): if self.status not in [RenderStatus.SCHEDULED, RenderStatus.NOT_STARTED]: logger.error(f"Trying to start job with status: {self.status}") return if not os.path.exists(self.input_path): self.status = RenderStatus.ERROR msg = 'Cannot find input path: {}'.format(self.input_path) logger.error(msg) self.errors.append(msg) return if not self.engine.renderer_path(): self.status = RenderStatus.ERROR msg = 'Cannot find render engine path for {}'.format(self.engine.name()) logger.error(msg) self.errors.append(msg) return self.status = RenderStatus.RUNNING self.start_time = datetime.now() logger.info(f'Starting {self.engine.name()} {self.engine.version()} Render for {self.input_path} | Frame Count: {self.total_frames}') self.__thread.start() def run(self): # Setup logging log_dir = os.path.dirname(self.log_path()) os.makedirs(log_dir, exist_ok=True) # Start process and get updates subprocess_cmds = self.generate_subprocess() logger.debug("Renderer commands generated - {}".format(" ".join(subprocess_cmds))) self.__process = subprocess.Popen(subprocess_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=False) with open(self.log_path(), "a") as f: f.write(f"{self.start_time.isoformat()} - Starting {self.engine.name()} {self.engine.version()} " f"Render for {self.input_path}") f.write(f"Running command: {' '.join(subprocess_cmds)}\n") for c in io.TextIOWrapper(self.__process.stdout, encoding="utf-8"): # or another encoding f.write(c) logger.debug(f"{self.engine.name()}Worker: {c.strip()}") self.last_output = c.strip() self._parse_stdout(c.strip()) f.write('\n') # Check return codes return_code = self.__process.wait() self.end_time = datetime.now() # Return early if job was cancelled if self.status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: return if return_code: message = f"{self.engine.name()} render failed with return_code {return_code} after {self.time_elapsed()}" logger.error(message) f.write(message) self.status = RenderStatus.ERROR if not self.errors: self.errors = [message] return message = f"{self.engine.name()} render completed successfully in {self.time_elapsed()}" logger.info(message) f.write(message) from lib.server.server_proxy import RenderServerProxy # Wait on children jobs, if necessary if self.children: self.status = RenderStatus.WAITING subjobs_still_running = self.children.copy() while len(subjobs_still_running): for hostname, job_id in subjobs_still_running.copy().items(): proxy = RenderServerProxy(hostname) response = proxy.get_job_info(job_id) if not response: logger.warning(f"No response from: {hostname}") else: status = string_to_status(response.get('status', '')) status_msg = f"Subjob {job_id}@{hostname} | Status: {status} | {response.get('percent_complete')}%" if status in [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED]: logger.info(f"Downloading completed subjob files from {hostname} to localhost") try: zip_file_path = self.output_path + f'_{hostname}_{job_id}.zip' proxy.get_job_files(job_id, zip_file_path) logger.debug("Zip file download successfully - Preparing to unzip.") extract_path = os.path.dirname(zip_file_path) with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: zip_ref.extractall(extract_path) logger.info(f"Successfully extracted zip to: {extract_path}") os.remove(zip_file_path) except Exception as e: err_msg = f"Error transferring output from subjob {job_id}@{hostname}: {e}" logger.exception(err_msg) self.errors.append(err_msg) finally: subjobs_still_running.pop(hostname) else: logger.debug(status_msg) logger.debug(f"Waiting on {len(subjobs_still_running)} subjobs on {', '.join(list(subjobs_still_running.keys()))}") time.sleep(5) logger.info("All subjobs complete") # Post Render Work logger.debug("Starting post-processing work") self.post_processing() self.status = RenderStatus.COMPLETED def post_processing(self): pass def is_running(self): if self.__thread: return self.__thread.is_alive() return False def log_error(self, error_line, halt_render=False): logger.error(error_line) self.errors.append(error_line) if halt_render: self.stop(is_error=True) def stop(self, is_error=False): if hasattr(self, '__process'): try: process = psutil.Process(self.__process.pid) for proc in process.children(recursive=True): proc.kill() process.kill() except Exception as e: logger.debug(f"Error stopping the process: {e}") if self.status in [RenderStatus.RUNNING, RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED]: if is_error: err_message = self.errors[-1] if self.errors else 'Unknown error' logger.error(f"Halting render due to error: {err_message}") self.status = RenderStatus.ERROR else: self.status = RenderStatus.CANCELLED def percent_complete(self): return 0 def _parse_stdout(self, line): raise NotImplementedError("_parse_stdout not implemented") def time_elapsed(self): return get_time_elapsed(self.start_time, self.end_time) def file_list(self): try: job_dir = os.path.dirname(self.output_path) file_list = [os.path.join(job_dir, file) for file in os.listdir(job_dir)] file_list.sort() return file_list except FileNotFoundError: return [] def json(self): job_dict = { 'id': self.id, 'name': self.name, 'input_path': self.input_path, 'output_path': self.output_path, 'priority': self.priority, 'parent': self.parent, 'children': self.children, 'date_created': self.date_created, 'start_time': self.start_time, 'end_time': self.end_time, 'status': self.status.value, 'file_hash': self.file_hash, 'percent_complete': self.percent_complete(), 'file_count': len(self.file_list()), 'renderer': self.renderer, 'renderer_version': self.renderer_version, 'errors': getattr(self, 'errors', None), 'start_frame': self.start_frame, 'end_frame': self.end_frame, 'total_frames': self.total_frames, 'last_output': getattr(self, 'last_output', None), 'log_path': self.log_path() } # convert to json and back to auto-convert dates to iso format def date_serializer(o): if isinstance(o, datetime): return o.isoformat() json_convert = json.dumps(job_dict, default=date_serializer) worker_json = json.loads(json_convert) return worker_json def timecode_to_frames(timecode, frame_rate): e = [int(x) for x in timecode.split(':')] seconds = (((e[0] * 60) + e[1] * 60) + e[2]) frames = (seconds * frame_rate) + e[-1] + 1 return frames