Files
Zordon/lib/render_workers/base_worker.py
2023-05-27 14:40:34 -05:00

284 lines
10 KiB
Python

#!/usr/bin/env python3
import io
import logging
import os
import subprocess
import threading
import json
from datetime import datetime
from enum import Enum
import psutil
logger = logging.getLogger()
class RenderStatus(Enum):
NOT_STARTED = "not_started"
RUNNING = "running"
COMPLETED = "completed"
CANCELLED = "cancelled"
ERROR = "error"
SCHEDULED = "scheduled"
UNDEFINED = "undefined"
def string_to_status(string):
for stat in RenderStatus:
if stat.value == string:
return stat
return RenderStatus.ERROR
class BaseRenderWorker(object):
engine = None
def __init__(self, input_path, output_path, args=None, ignore_extensions=True):
if not ignore_extensions:
if not any(ext in input_path for ext in self.engine.supported_extensions):
err_meg = f'Cannot find valid project with supported file extension for {self.engine.name()} renderer'
logger.error(err_meg)
raise ValueError(err_meg)
if not self.engine:
raise NotImplementedError("Engine not defined")
# Essential Info
self.input_path = input_path
self.output_path = output_path
self.args = args or {}
self.date_created = datetime.now()
self.renderer_version = self.engine.version()
# Frame Ranges
self.total_frames = 0
self.current_frame = 0
# Logging
self.log_path = None
self.start_time = None
self.end_time = None
# History
self.status = RenderStatus.NOT_STARTED
self.warnings = []
self.errors = []
self.halt_on_errors = True
self.failed_attempts = 0
self.maximum_attempts = 1
# Threads and processes
self.__thread = threading.Thread(target=self.run, args=())
self.__thread.daemon = True
self.__process = None
self.is_finished = False
self.last_output = None
def validate(self):
if not os.path.exists(self.input_path):
raise FileNotFoundError(f"Cannot find input path: {self.input_path}")
self.generate_subprocess()
def generate_subprocess(self):
# Convert raw args from string if available and catch conflicts
generated_args = self.generate_worker_subprocess()
generated_args_flags = [x for x in generated_args if x.startswith('-')]
if len(generated_args_flags) != len(set(generated_args_flags)):
msg = "Cannot generate subprocess - Multiple arg conflicts detected"
logger.error(msg)
logger.debug(f"Generated args for subprocess: {generated_args}")
raise ValueError(msg)
return generated_args
def get_raw_args(self):
raw_args_string = self.args.get('raw', None)
raw_args = None
if raw_args_string:
import shlex
raw_args = shlex.split(raw_args_string)
return raw_args
def generate_worker_subprocess(self):
raise NotImplementedError("generate_worker_subprocess not implemented")
def start(self):
if not os.path.exists(self.input_path):
self.status = RenderStatus.ERROR
msg = 'Cannot find input path: {}'.format(self.input_path)
logger.error(msg)
self.errors.append(msg)
return
if not self.engine.renderer_path():
self.status = RenderStatus.ERROR
msg = 'Cannot find render engine path for {}'.format(self.engine.name())
logger.error(msg)
self.errors.append(msg)
return
self.status = RenderStatus.RUNNING
logger.info(f'Starting {self.engine.name()} {self.engine.version()} Render for {self.input_path}')
self.__thread.start()
def run(self):
# Setup logging
try:
if not self.log_path:
log_dir = os.path.join(os.path.dirname(self.input_path), 'logs')
if not os.path.exists(log_dir):
os.makedirs(log_dir)
self.log_path = os.path.join(log_dir, os.path.basename(self.input_path)) + '.log'
logger.info('Logs saved in {}'.format(self.log_path))
except Exception as e:
logger.error("Error setting up logging: {}".format(e))
while self.failed_attempts < self.maximum_attempts and self.status is not RenderStatus.COMPLETED:
if self.failed_attempts:
logger.info(f'Attempt #{self.failed_attempts} failed. Starting attempt #{self.failed_attempts + 1}')
# Start process and get updates
subprocess_cmds = self.generate_subprocess()
logger.debug("Renderer commands generated - {}".format(" ".join(subprocess_cmds)))
self.__process = subprocess.Popen(subprocess_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=False)
self.start_time = datetime.now()
with open(self.log_path, "a") as f:
f.write(f"{self.start_time.isoformat()} - Starting {self.engine.name()} {self.engine.version()} "
f"Render for {self.input_path}")
f.write(f"Running command: {' '.join(subprocess_cmds)}\n")
for c in io.TextIOWrapper(self.__process.stdout, encoding="utf-8"): # or another encoding
f.write(c)
logger.debug(f"{self.engine.name()}Worker: {c.strip()}")
self.last_output = c.strip()
self._parse_stdout(c.strip())
f.write('\n')
# Check return codes
return_code = self.__process.wait()
self.end_time = datetime.now()
# Return early if job was cancelled
if self.status in [RenderStatus.CANCELLED, RenderStatus.ERROR]:
self.is_finished = True
return
duration = self.end_time - self.start_time
if return_code:
message = f"{self.engine.name()} render failed with return_code {return_code} after {duration}"
logger.error(message)
self.failed_attempts = self.failed_attempts + 1
else:
message = f"{self.engine.name()} render completed successfully in {duration}"
logger.info(message)
self.status = RenderStatus.COMPLETED
f.write(message)
if self.failed_attempts >= self.maximum_attempts and self.status is not RenderStatus.CANCELLED:
logger.error('{} Render of {} failed after {} attempts'.format(self.engine.name(), self.input_path,
self.failed_attempts))
self.status = RenderStatus.ERROR
if not self.errors:
self.errors = [self.last_output]
self.is_finished = True
def is_running(self):
if self.__thread:
return self.__thread.is_alive()
return False
def log_error(self, error_line):
logger.error(error_line)
self.errors.append(error_line)
if self.halt_on_errors:
self.stop(is_error=True)
def stop(self, is_error=False):
if self.__process:
try:
if self.status in [RenderStatus.RUNNING, RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED]:
if is_error:
err_message = self.errors[-1] if self.errors else 'Unknown error'
logger.error(f"Halting render due to error: {err_message}")
self.status = RenderStatus.ERROR
else:
self.status = RenderStatus.CANCELLED
self.maximum_attempts = 0
process = psutil.Process(self.__process.pid)
for proc in process.children(recursive=True):
proc.kill()
process.kill()
except Exception as e:
logger.debug(f"Error stopping the process: {e}")
def percent_complete(self):
return 0
def _parse_stdout(self, line):
raise NotImplementedError("_parse_stdout not implemented")
def time_elapsed(self):
from string import Template
class DeltaTemplate(Template):
delimiter = "%"
def strfdelta(tdelta, fmt='%H:%M:%S'):
d = {"D": tdelta.days}
hours, rem = divmod(tdelta.seconds, 3600)
minutes, seconds = divmod(rem, 60)
d["H"] = '{:02d}'.format(hours)
d["M"] = '{:02d}'.format(minutes)
d["S"] = '{:02d}'.format(seconds)
t = DeltaTemplate(fmt)
return t.substitute(**d)
# calculate elapsed time
elapsed_time = None
start_time = self.start_time
end_time = self.end_time
if start_time:
if end_time:
elapsed_time = end_time - start_time
elif self.status == RenderStatus.RUNNING:
elapsed_time = datetime.now() - start_time
elapsed_time_string = strfdelta(elapsed_time) if elapsed_time else "Unknown"
return elapsed_time_string
def json(self):
worker_data = self.__dict__.copy()
worker_data['percent_complete'] = self.percent_complete()
worker_data['time_elapsed'] = self.time_elapsed()
worker_data['status'] = self.status.value
keys_to_remove = ['thread', 'process'] # remove unwanted keys from dict
for key in worker_data.keys():
if key.startswith('_'):
keys_to_remove.append(key)
for key in keys_to_remove:
worker_data.pop(key, None)
# convert to json and back to auto-convert dates to iso format
def date_serializer(o):
if isinstance(o, datetime):
return o.isoformat()
json_convert = json.dumps(worker_data, default=date_serializer)
worker_json = json.loads(json_convert)
return worker_json
def timecode_to_frames(timecode, frame_rate):
e = [int(x) for x in timecode.split(':')]
seconds = (((e[0] * 60) + e[1] * 60) + e[2])
frames = (seconds * frame_rate) + e[-1] + 1
return frames