Wait for subjob completion and download render files to host (#17)

* Fix Blender image sequence -> video conversion and change video to use ProRes

* Wait for child jobs to complete

* Download and extract render files from subjobs

* Fix issue where zip was not removed

* Update client to use new method names in server proxy

* Fix minor download issue
This commit is contained in:
2023-06-15 17:44:34 -05:00
committed by GitHub
parent 0a0a228731
commit e6eb344d19
6 changed files with 154 additions and 103 deletions

View File

@@ -1,18 +1,20 @@
#!/usr/bin/env python3
import io
import json
import logging
import os
import subprocess
import threading
import json
import time
import zipfile
from datetime import datetime
from enum import Enum
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from lib.utilities.misc_helper import get_time_elapsed
import psutil
from sqlalchemy import Column, Integer, String, DateTime, JSON
from sqlalchemy.ext.declarative import declarative_base
from lib.utilities.misc_helper import get_time_elapsed
logger = logging.getLogger()
Base = declarative_base()
@@ -25,6 +27,7 @@ class RenderStatus(Enum):
CANCELLED = "cancelled"
ERROR = "error"
SCHEDULED = "scheduled"
WAITING = "waiting"
NOT_READY = "not_ready"
UNDEFINED = "undefined"
@@ -52,7 +55,7 @@ class BaseRenderWorker(Base):
start_frame = Column(Integer)
end_frame = Column(Integer, nullable=True)
parent = Column(String, nullable=True)
children = Column(String, nullable=True)
children = Column(JSON)
name = Column(String)
file_hash = Column(String)
_status = Column(String)
@@ -84,7 +87,7 @@ class BaseRenderWorker(Base):
self.renderer_version = self.engine.version()
self.priority = priority
self.parent = parent
self.children = None
self.children = {}
self.name = name or os.path.basename(input_path)
# Frame Ranges
@@ -101,14 +104,11 @@ class BaseRenderWorker(Base):
self.status = RenderStatus.NOT_READY
self.warnings = []
self.errors = []
self.failed_attempts = 0
self.maximum_attempts = 1
# Threads and processes
self.__thread = threading.Thread(target=self.run, args=())
self.__thread.daemon = True
self.__process = None
self.is_finished = False
self.last_output = None
@property
@@ -192,56 +192,88 @@ class BaseRenderWorker(Base):
log_dir = os.path.dirname(self.log_path())
os.makedirs(log_dir, exist_ok=True)
while self.failed_attempts < self.maximum_attempts and self.status is not RenderStatus.COMPLETED:
# Start process and get updates
subprocess_cmds = self.generate_subprocess()
logger.debug("Renderer commands generated - {}".format(" ".join(subprocess_cmds)))
self.__process = subprocess.Popen(subprocess_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=False)
if self.failed_attempts:
logger.info(f'Attempt #{self.failed_attempts} failed. Starting attempt #{self.failed_attempts + 1}')
with open(self.log_path(), "a") as f:
# Start process and get updates
subprocess_cmds = self.generate_subprocess()
logger.debug("Renderer commands generated - {}".format(" ".join(subprocess_cmds)))
self.__process = subprocess.Popen(subprocess_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=False)
f.write(f"{self.start_time.isoformat()} - Starting {self.engine.name()} {self.engine.version()} "
f"Render for {self.input_path}")
f.write(f"Running command: {' '.join(subprocess_cmds)}\n")
for c in io.TextIOWrapper(self.__process.stdout, encoding="utf-8"): # or another encoding
f.write(c)
logger.debug(f"{self.engine.name()}Worker: {c.strip()}")
self.last_output = c.strip()
self._parse_stdout(c.strip())
f.write('\n')
with open(self.log_path(), "a") as f:
f.write(f"{self.start_time.isoformat()} - Starting {self.engine.name()} {self.engine.version()} "
f"Render for {self.input_path}")
f.write(f"Running command: {' '.join(subprocess_cmds)}\n")
for c in io.TextIOWrapper(self.__process.stdout, encoding="utf-8"): # or another encoding
f.write(c)
logger.debug(f"{self.engine.name()}Worker: {c.strip()}")
self.last_output = c.strip()
self._parse_stdout(c.strip())
f.write('\n')
# Check return codes
return_code = self.__process.wait()
self.end_time = datetime.now()
# Return early if job was cancelled
if self.status in [RenderStatus.CANCELLED, RenderStatus.ERROR]:
self.is_finished = True
return
if return_code:
message = f"{self.engine.name()} render failed with return_code {return_code} after {self.time_elapsed()}"
logger.error(message)
self.failed_attempts = self.failed_attempts + 1
else:
message = f"{self.engine.name()} render completed successfully in {self.time_elapsed()}"
logger.info(message)
self.status = RenderStatus.COMPLETED
# Check return codes
return_code = self.__process.wait()
self.end_time = datetime.now()
# Return early if job was cancelled
if self.status in [RenderStatus.CANCELLED, RenderStatus.ERROR]:
return
if return_code:
message = f"{self.engine.name()} render failed with return_code {return_code} after {self.time_elapsed()}"
logger.error(message)
f.write(message)
self.status = RenderStatus.ERROR
if not self.errors:
self.errors = [message]
return
if self.failed_attempts >= self.maximum_attempts and self.status is not RenderStatus.CANCELLED:
logger.error('{} Render of {} failed after {} attempts'.format(self.engine.name(), self.input_path,
self.failed_attempts))
self.status = RenderStatus.ERROR
if not self.errors:
self.errors = [self.last_output]
self.is_finished = True
message = f"{self.engine.name()} render completed successfully in {self.time_elapsed()}"
logger.info(message)
f.write(message)
from lib.server.server_proxy import RenderServerProxy
# Wait on children jobs, if necessary
if self.children:
self.status = RenderStatus.WAITING
subjobs_still_running = self.children.copy()
while len(subjobs_still_running):
for hostname, job_id in subjobs_still_running.copy().items():
proxy = RenderServerProxy(hostname)
response = proxy.get_job_info(job_id)
if not response:
logger.warning(f"No response from: {hostname}")
else:
status = string_to_status(response.get('status', ''))
status_msg = f"Subjob {job_id}@{hostname} | Status: {status} | {response.get('percent_complete')}%"
if status in [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED]:
logger.info(f"Downloading completed subjob files from {hostname} to localhost")
try:
zip_file_path = self.output_path + f'_{hostname}_{job_id}.zip'
proxy.get_job_files(job_id, zip_file_path)
logger.debug("Zip file download successfully - Preparing to unzip.")
extract_path = os.path.dirname(zip_file_path)
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
zip_ref.extractall(extract_path)
logger.info(f"Successfully extracted zip to: {extract_path}")
os.remove(zip_file_path)
except Exception as e:
err_msg = f"Error transferring output from subjob {job_id}@{hostname}: {e}"
logger.exception(err_msg)
self.errors.append(err_msg)
finally:
subjobs_still_running.pop(hostname)
else:
logger.debug(status_msg)
logger.debug(f"Waiting on {len(subjobs_still_running)} subjobs on {', '.join(list(subjobs_still_running.keys()))}")
time.sleep(5)
logger.info("All subjobs complete")
# Post Render Work
logger.debug("Starting post-processing work")
self.post_processing()
self.status = RenderStatus.COMPLETED
def post_processing(self):
pass
@@ -260,7 +292,6 @@ class BaseRenderWorker(Base):
def stop(self, is_error=False):
if hasattr(self, '__process'):
try:
self.maximum_attempts = 0
process = psutil.Process(self.__process.pid)
for proc in process.children(recursive=True):
proc.kill()
@@ -285,10 +316,13 @@ class BaseRenderWorker(Base):
return get_time_elapsed(self.start_time, self.end_time)
def file_list(self):
job_dir = os.path.dirname(self.output_path)
file_list = [os.path.join(job_dir, file) for file in os.listdir(job_dir)]
file_list.sort()
return file_list
try:
job_dir = os.path.dirname(self.output_path)
file_list = [os.path.join(job_dir, file) for file in os.listdir(job_dir)]
file_list.sort()
return file_list
except FileNotFoundError:
return []
def json(self):
job_dict = {