mirror of
https://github.com/blw1138/Zordon.git
synced 2025-12-17 16:58:12 +00:00
Remove Old Multi-Client Code / Refactoring (#13)
* Remove a lot of old code from render_queue regarding clients * More code cleanup * More code cleanup * Move everything around * Minor log change
This commit is contained in:
323
lib/workers/base_worker.py
Normal file
323
lib/workers/base_worker.py
Normal file
@@ -0,0 +1,323 @@
|
||||
#!/usr/bin/env python3
|
||||
import io
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import threading
|
||||
import json
|
||||
import glob
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
|
||||
from sqlalchemy import Column, Integer, String, DateTime
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from lib.utilities.misc_helper import get_time_elapsed
|
||||
|
||||
import psutil
|
||||
|
||||
logger = logging.getLogger()
|
||||
Base = declarative_base()
|
||||
|
||||
|
||||
class RenderStatus(Enum):
|
||||
NOT_STARTED = "not_started"
|
||||
RUNNING = "running"
|
||||
COMPLETED = "completed"
|
||||
CANCELLED = "cancelled"
|
||||
ERROR = "error"
|
||||
SCHEDULED = "scheduled"
|
||||
UNDEFINED = "undefined"
|
||||
|
||||
|
||||
def string_to_status(string):
|
||||
for stat in RenderStatus:
|
||||
if stat.value == string:
|
||||
return stat
|
||||
return RenderStatus.UNDEFINED
|
||||
|
||||
|
||||
class BaseRenderWorker(Base):
|
||||
__tablename__ = 'render_workers'
|
||||
|
||||
id = Column(String, primary_key=True)
|
||||
input_path = Column(String)
|
||||
output_path = Column(String)
|
||||
date_created = Column(DateTime)
|
||||
start_time = Column(DateTime, nullable=True)
|
||||
end_time = Column(DateTime, nullable=True)
|
||||
renderer = Column(String)
|
||||
renderer_version = Column(String)
|
||||
priority = Column(Integer)
|
||||
total_frames = Column(Integer)
|
||||
owner = Column(String)
|
||||
client = Column(String)
|
||||
name = Column(String)
|
||||
file_hash = Column(String)
|
||||
_status = Column(String)
|
||||
|
||||
engine = None
|
||||
|
||||
def __init__(self, input_path, output_path, priority=2, args=None, ignore_extensions=True, owner=None, client=None,
|
||||
name=None):
|
||||
|
||||
if not ignore_extensions:
|
||||
if not any(ext in input_path for ext in self.engine.supported_extensions):
|
||||
err_meg = f'Cannot find valid project with supported file extension for {self.engine.name()} renderer'
|
||||
logger.error(err_meg)
|
||||
raise ValueError(err_meg)
|
||||
if not self.engine:
|
||||
raise NotImplementedError("Engine not defined")
|
||||
|
||||
def generate_id():
|
||||
import uuid
|
||||
return str(uuid.uuid4()).split('-')[0]
|
||||
|
||||
# Essential Info
|
||||
self.id = generate_id()
|
||||
self.input_path = input_path
|
||||
self.output_path = output_path
|
||||
self.args = args or {}
|
||||
self.date_created = datetime.now()
|
||||
self.renderer = self.engine.name()
|
||||
self.renderer_version = self.engine.version()
|
||||
self.priority = priority
|
||||
self.owner = owner
|
||||
self.client = client
|
||||
self.name = name
|
||||
|
||||
# Frame Ranges
|
||||
self.total_frames = 0
|
||||
self.current_frame = 0
|
||||
|
||||
# Logging
|
||||
self.start_time = None
|
||||
self.end_time = None
|
||||
|
||||
# History
|
||||
self.status = RenderStatus.NOT_STARTED
|
||||
self.warnings = []
|
||||
self.errors = []
|
||||
self.failed_attempts = 0
|
||||
self.maximum_attempts = 1
|
||||
|
||||
# Threads and processes
|
||||
self.__thread = threading.Thread(target=self.run, args=())
|
||||
self.__thread.daemon = True
|
||||
self.__process = None
|
||||
self.is_finished = False
|
||||
self.last_output = None
|
||||
|
||||
@property
|
||||
def status(self):
|
||||
return self._status
|
||||
|
||||
@status.setter
|
||||
def status(self, value):
|
||||
self._status = value.value
|
||||
|
||||
@status.getter
|
||||
def status(self):
|
||||
if self._status in [RenderStatus.RUNNING.value, RenderStatus.NOT_STARTED.value]:
|
||||
if not hasattr(self, 'errors'):
|
||||
return RenderStatus.CANCELLED
|
||||
return string_to_status(self._status)
|
||||
|
||||
def validate(self):
|
||||
if not os.path.exists(self.input_path):
|
||||
raise FileNotFoundError(f"Cannot find input path: {self.input_path}")
|
||||
self.generate_subprocess()
|
||||
|
||||
def generate_subprocess(self):
|
||||
# Convert raw args from string if available and catch conflicts
|
||||
generated_args = self.generate_worker_subprocess()
|
||||
generated_args_flags = [x for x in generated_args if x.startswith('-')]
|
||||
if len(generated_args_flags) != len(set(generated_args_flags)):
|
||||
msg = "Cannot generate subprocess - Multiple arg conflicts detected"
|
||||
logger.error(msg)
|
||||
logger.debug(f"Generated args for subprocess: {generated_args}")
|
||||
raise ValueError(msg)
|
||||
return generated_args
|
||||
|
||||
def get_raw_args(self):
|
||||
raw_args_string = self.args.get('raw', None)
|
||||
raw_args = None
|
||||
if raw_args_string:
|
||||
import shlex
|
||||
raw_args = shlex.split(raw_args_string)
|
||||
return raw_args
|
||||
|
||||
def generate_worker_subprocess(self):
|
||||
raise NotImplementedError("generate_worker_subprocess not implemented")
|
||||
|
||||
def log_path(self):
|
||||
filename = (self.name or os.path.basename(self.input_path)) + '_' + \
|
||||
self.date_created.strftime("%Y.%m.%d_%H.%M.%S") + '.log'
|
||||
return os.path.join(os.path.dirname(self.input_path), filename)
|
||||
|
||||
def start(self):
|
||||
|
||||
if self.status not in [RenderStatus.SCHEDULED, RenderStatus.NOT_STARTED]:
|
||||
logger.error(f"Trying to start job with status: {self.status}")
|
||||
return
|
||||
|
||||
if not os.path.exists(self.input_path):
|
||||
self.status = RenderStatus.ERROR
|
||||
msg = 'Cannot find input path: {}'.format(self.input_path)
|
||||
logger.error(msg)
|
||||
self.errors.append(msg)
|
||||
return
|
||||
|
||||
if not self.engine.renderer_path():
|
||||
self.status = RenderStatus.ERROR
|
||||
msg = 'Cannot find render engine path for {}'.format(self.engine.name())
|
||||
logger.error(msg)
|
||||
self.errors.append(msg)
|
||||
return
|
||||
|
||||
self.status = RenderStatus.RUNNING
|
||||
logger.info(f'Starting {self.engine.name()} {self.engine.version()} Render for {self.input_path}')
|
||||
self.__thread.start()
|
||||
|
||||
def run(self):
|
||||
# Setup logging
|
||||
log_dir = os.path.dirname(self.log_path())
|
||||
os.makedirs(log_dir, exist_ok=True)
|
||||
|
||||
while self.failed_attempts < self.maximum_attempts and self.status is not RenderStatus.COMPLETED:
|
||||
|
||||
if self.failed_attempts:
|
||||
logger.info(f'Attempt #{self.failed_attempts} failed. Starting attempt #{self.failed_attempts + 1}')
|
||||
|
||||
# Start process and get updates
|
||||
subprocess_cmds = self.generate_subprocess()
|
||||
logger.debug("Renderer commands generated - {}".format(" ".join(subprocess_cmds)))
|
||||
self.__process = subprocess.Popen(subprocess_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
|
||||
universal_newlines=False)
|
||||
self.start_time = datetime.now()
|
||||
|
||||
with open(self.log_path(), "a") as f:
|
||||
|
||||
f.write(f"{self.start_time.isoformat()} - Starting {self.engine.name()} {self.engine.version()} "
|
||||
f"Render for {self.input_path}")
|
||||
f.write(f"Running command: {' '.join(subprocess_cmds)}\n")
|
||||
for c in io.TextIOWrapper(self.__process.stdout, encoding="utf-8"): # or another encoding
|
||||
f.write(c)
|
||||
logger.debug(f"{self.engine.name()}Worker: {c.strip()}")
|
||||
self.last_output = c.strip()
|
||||
self._parse_stdout(c.strip())
|
||||
f.write('\n')
|
||||
|
||||
# Check return codes
|
||||
return_code = self.__process.wait()
|
||||
self.end_time = datetime.now()
|
||||
# Return early if job was cancelled
|
||||
if self.status in [RenderStatus.CANCELLED, RenderStatus.ERROR]:
|
||||
self.is_finished = True
|
||||
return
|
||||
|
||||
if return_code:
|
||||
message = f"{self.engine.name()} render failed with return_code {return_code} after {self.time_elapsed()}"
|
||||
logger.error(message)
|
||||
self.failed_attempts = self.failed_attempts + 1
|
||||
else:
|
||||
message = f"{self.engine.name()} render completed successfully in {self.time_elapsed()}"
|
||||
logger.info(message)
|
||||
self.status = RenderStatus.COMPLETED
|
||||
|
||||
f.write(message)
|
||||
|
||||
if self.failed_attempts >= self.maximum_attempts and self.status is not RenderStatus.CANCELLED:
|
||||
logger.error('{} Render of {} failed after {} attempts'.format(self.engine.name(), self.input_path,
|
||||
self.failed_attempts))
|
||||
self.status = RenderStatus.ERROR
|
||||
if not self.errors:
|
||||
self.errors = [self.last_output]
|
||||
self.is_finished = True
|
||||
self.post_processing()
|
||||
|
||||
def post_processing(self):
|
||||
pass
|
||||
|
||||
def is_running(self):
|
||||
if self.__thread:
|
||||
return self.__thread.is_alive()
|
||||
return False
|
||||
|
||||
def log_error(self, error_line, halt_render=False):
|
||||
logger.error(error_line)
|
||||
self.errors.append(error_line)
|
||||
if halt_render:
|
||||
self.stop(is_error=True)
|
||||
|
||||
def stop(self, is_error=False):
|
||||
if hasattr(self, '__process'):
|
||||
try:
|
||||
self.maximum_attempts = 0
|
||||
process = psutil.Process(self.__process.pid)
|
||||
for proc in process.children(recursive=True):
|
||||
proc.kill()
|
||||
process.kill()
|
||||
except Exception as e:
|
||||
logger.debug(f"Error stopping the process: {e}")
|
||||
if self.status in [RenderStatus.RUNNING, RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED]:
|
||||
if is_error:
|
||||
err_message = self.errors[-1] if self.errors else 'Unknown error'
|
||||
logger.error(f"Halting render due to error: {err_message}")
|
||||
self.status = RenderStatus.ERROR
|
||||
else:
|
||||
self.status = RenderStatus.CANCELLED
|
||||
|
||||
def percent_complete(self):
|
||||
return 0
|
||||
|
||||
def _parse_stdout(self, line):
|
||||
raise NotImplementedError("_parse_stdout not implemented")
|
||||
|
||||
def time_elapsed(self):
|
||||
return get_time_elapsed(self.start_time, self.end_time)
|
||||
|
||||
def file_list(self):
|
||||
job_dir = os.path.dirname(self.output_path)
|
||||
file_list = glob.glob(os.path.join(job_dir, '*'))
|
||||
file_list.sort()
|
||||
return file_list
|
||||
|
||||
def json(self):
|
||||
job_dict = {
|
||||
'id': self.id,
|
||||
'name': self.name,
|
||||
'input_path': self.input_path,
|
||||
'output_path': self.output_path,
|
||||
'priority': self.priority,
|
||||
'owner': self.owner,
|
||||
'client': self.client,
|
||||
'date_created': self.date_created,
|
||||
'start_time': self.start_time,
|
||||
'end_time': self.end_time,
|
||||
'status': self.status.value,
|
||||
'file_hash': self.file_hash,
|
||||
'percent_complete': self.percent_complete(),
|
||||
'file_count': len(self.file_list()),
|
||||
'renderer': self.renderer,
|
||||
'renderer_version': self.renderer_version,
|
||||
'errors': getattr(self, 'errors', None),
|
||||
'total_frames': self.total_frames,
|
||||
'last_output': getattr(self, 'last_output', None),
|
||||
'log_path': self.log_path()
|
||||
}
|
||||
|
||||
# convert to json and back to auto-convert dates to iso format
|
||||
def date_serializer(o):
|
||||
if isinstance(o, datetime):
|
||||
return o.isoformat()
|
||||
|
||||
json_convert = json.dumps(job_dict, default=date_serializer)
|
||||
worker_json = json.loads(json_convert)
|
||||
return worker_json
|
||||
|
||||
|
||||
def timecode_to_frames(timecode, frame_rate):
|
||||
e = [int(x) for x in timecode.split(':')]
|
||||
seconds = (((e[0] * 60) + e[1] * 60) + e[2])
|
||||
frames = (seconds * frame_rate) + e[-1] + 1
|
||||
return frames
|
||||
Reference in New Issue
Block a user