Files
Zordon/lib/render_workers/base_worker.py
2023-05-30 22:30:18 -05:00

328 lines
11 KiB
Python

#!/usr/bin/env python3
import io
import logging
import os
import subprocess
import threading
import json
from datetime import datetime
from enum import Enum
from sqlalchemy import Column, Integer, String, DateTime, JSON, event
from sqlalchemy.ext.declarative import declarative_base
import psutil
logger = logging.getLogger()
Base = declarative_base()
class RenderStatus(Enum):
NOT_STARTED = "not_started"
RUNNING = "running"
COMPLETED = "completed"
CANCELLED = "cancelled"
ERROR = "error"
SCHEDULED = "scheduled"
UNDEFINED = "undefined"
def string_to_status(string):
for stat in RenderStatus:
if stat.value == string:
return stat
return RenderStatus.UNDEFINED
class BaseRenderWorker(Base):
__tablename__ = 'render_workers'
id = Column(Integer, primary_key=True)
input_path = Column(String)
output_path = Column(String)
date_created = Column(DateTime)
start_time = Column(DateTime)
end_time = Column(DateTime)
renderer = Column(String)
renderer_version = Column(String)
priority = Column(Integer)
owner = Column(String)
client = Column(String)
name = Column(String)
file_hash = Column(String)
_status = Column(String)
engine = None
def __init__(self, input_path, output_path, priority=2, args=None, ignore_extensions=True, owner=None, client=None,
name=None):
if not ignore_extensions:
if not any(ext in input_path for ext in self.engine.supported_extensions):
err_meg = f'Cannot find valid project with supported file extension for {self.engine.name()} renderer'
logger.error(err_meg)
raise ValueError(err_meg)
if not self.engine:
raise NotImplementedError("Engine not defined")
# Essential Info
self.input_path = input_path
self.output_path = output_path
self.args = args or {}
self.date_created = datetime.now()
self.renderer = self.engine.name()
self.renderer_version = self.engine.version()
self.priority = priority
self.owner = owner
self.client = client
self.name = name
# Frame Ranges
self.total_frames = 0
self.current_frame = 0
# Logging
self.log_path = None
self.start_time = None
self.end_time = None
# History
self.status = RenderStatus.NOT_STARTED
self.warnings = []
self.errors = []
self.failed_attempts = 0
self.maximum_attempts = 1
# Threads and processes
self.__thread = threading.Thread(target=self.run, args=())
self.__thread.daemon = True
self.__process = None
self.is_finished = False
self.last_output = None
@property
def status(self):
return self._status
@status.setter
def status(self, value):
self._status = value.value
@status.getter
def status(self):
return string_to_status(self._status)
def validate(self):
if not os.path.exists(self.input_path):
raise FileNotFoundError(f"Cannot find input path: {self.input_path}")
self.generate_subprocess()
def generate_subprocess(self):
# Convert raw args from string if available and catch conflicts
generated_args = self.generate_worker_subprocess()
generated_args_flags = [x for x in generated_args if x.startswith('-')]
if len(generated_args_flags) != len(set(generated_args_flags)):
msg = "Cannot generate subprocess - Multiple arg conflicts detected"
logger.error(msg)
logger.debug(f"Generated args for subprocess: {generated_args}")
raise ValueError(msg)
return generated_args
def get_raw_args(self):
raw_args_string = self.args.get('raw', None)
raw_args = None
if raw_args_string:
import shlex
raw_args = shlex.split(raw_args_string)
return raw_args
def generate_worker_subprocess(self):
raise NotImplementedError("generate_worker_subprocess not implemented")
def start(self):
if not os.path.exists(self.input_path):
self.status = RenderStatus.ERROR
msg = 'Cannot find input path: {}'.format(self.input_path)
logger.error(msg)
self.errors.append(msg)
return
if not self.engine.renderer_path():
self.status = RenderStatus.ERROR
msg = 'Cannot find render engine path for {}'.format(self.engine.name())
logger.error(msg)
self.errors.append(msg)
return
self.status = RenderStatus.RUNNING
logger.info(f'Starting {self.engine.name()} {self.engine.version()} Render for {self.input_path}')
self.__thread.start()
def run(self):
# Setup logging
try:
if not self.log_path:
log_dir = os.path.join(os.path.dirname(self.input_path), 'logs')
if not os.path.exists(log_dir):
os.makedirs(log_dir)
self.log_path = os.path.join(log_dir, os.path.basename(self.input_path)) + '.log'
logger.info('Logs saved in {}'.format(self.log_path))
except Exception as e:
logger.error("Error setting up logging: {}".format(e))
while self.failed_attempts < self.maximum_attempts and self.status is not RenderStatus.COMPLETED:
if self.failed_attempts:
logger.info(f'Attempt #{self.failed_attempts} failed. Starting attempt #{self.failed_attempts + 1}')
# Start process and get updates
subprocess_cmds = self.generate_subprocess()
logger.debug("Renderer commands generated - {}".format(" ".join(subprocess_cmds)))
self.__process = subprocess.Popen(subprocess_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=False)
self.start_time = datetime.now()
with open(self.log_path, "a") as f:
f.write(f"{self.start_time.isoformat()} - Starting {self.engine.name()} {self.engine.version()} "
f"Render for {self.input_path}")
f.write(f"Running command: {' '.join(subprocess_cmds)}\n")
for c in io.TextIOWrapper(self.__process.stdout, encoding="utf-8"): # or another encoding
f.write(c)
logger.debug(f"{self.engine.name()}Worker: {c.strip()}")
self.last_output = c.strip()
self._parse_stdout(c.strip())
f.write('\n')
# Check return codes
return_code = self.__process.wait()
self.end_time = datetime.now()
# Return early if job was cancelled
if self.status in [RenderStatus.CANCELLED, RenderStatus.ERROR]:
self.is_finished = True
return
duration = self.end_time - self.start_time
if return_code:
message = f"{self.engine.name()} render failed with return_code {return_code} after {duration}"
logger.error(message)
self.failed_attempts = self.failed_attempts + 1
else:
message = f"{self.engine.name()} render completed successfully in {duration}"
logger.info(message)
self.status = RenderStatus.COMPLETED
f.write(message)
if self.failed_attempts >= self.maximum_attempts and self.status is not RenderStatus.CANCELLED:
logger.error('{} Render of {} failed after {} attempts'.format(self.engine.name(), self.input_path,
self.failed_attempts))
self.status = RenderStatus.ERROR
if not self.errors:
self.errors = [self.last_output]
self.is_finished = True
self.post_processing()
def post_processing(self):
pass
def is_running(self):
if self.__thread:
return self.__thread.is_alive()
return False
def log_error(self, error_line, halt_render=False):
logger.error(error_line)
self.errors.append(error_line)
if halt_render:
self.stop(is_error=True)
def stop(self, is_error=False):
if self.__process:
try:
if self.status in [RenderStatus.RUNNING, RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED]:
if is_error:
err_message = self.errors[-1] if self.errors else 'Unknown error'
logger.error(f"Halting render due to error: {err_message}")
self.status = RenderStatus.ERROR
else:
self.status = RenderStatus.CANCELLED
self.maximum_attempts = 0
process = psutil.Process(self.__process.pid)
for proc in process.children(recursive=True):
proc.kill()
process.kill()
except Exception as e:
logger.debug(f"Error stopping the process: {e}")
def percent_complete(self):
return 0
def _parse_stdout(self, line):
raise NotImplementedError("_parse_stdout not implemented")
def time_elapsed(self):
from string import Template
class DeltaTemplate(Template):
delimiter = "%"
def strfdelta(tdelta, fmt='%H:%M:%S'):
d = {"D": tdelta.days}
hours, rem = divmod(tdelta.seconds, 3600)
minutes, seconds = divmod(rem, 60)
d["H"] = '{:02d}'.format(hours)
d["M"] = '{:02d}'.format(minutes)
d["S"] = '{:02d}'.format(seconds)
t = DeltaTemplate(fmt)
return t.substitute(**d)
# calculate elapsed time
elapsed_time = None
start_time = self.start_time
end_time = self.end_time
if start_time:
if end_time:
elapsed_time = end_time - start_time
elif self.status == RenderStatus.RUNNING:
elapsed_time = datetime.now() - start_time
elapsed_time_string = strfdelta(elapsed_time) if elapsed_time else "Unknown"
return elapsed_time_string
def json(self):
worker_data = self.__dict__.copy()
worker_data['percent_complete'] = self.percent_complete()
worker_data['time_elapsed'] = self.time_elapsed()
worker_data['status'] = self.status.value
worker_data['renderer'] = self.renderer
worker_data['name'] = self.name
worker_data['renderer_version'] = self.renderer_version
keys_to_remove = ['thread', 'process'] # remove unwanted keys from dict
for key in worker_data.keys():
if key.startswith('_'):
keys_to_remove.append(key)
for key in keys_to_remove:
worker_data.pop(key, None)
# convert to json and back to auto-convert dates to iso format
def date_serializer(o):
if isinstance(o, datetime):
return o.isoformat()
json_convert = json.dumps(worker_data, default=date_serializer)
worker_json = json.loads(json_convert)
return worker_json
def timecode_to_frames(timecode, frame_rate):
e = [int(x) for x in timecode.split(':')]
seconds = (((e[0] * 60) + e[1] * 60) + e[2])
frames = (seconds * frame_rate) + e[-1] + 1
return frames