Integrate watchdog into render worker (#88)

* Add a watchdog to base_worker

* Logging cleanup

* Prevent multiple watchdogs from running if render process restarts

* Add process timeout parameter to Config

* Refactor

* Add error handling to process output parsing

* Fix issue where start_time was not getting set consistently
This commit is contained in:
2024-08-06 10:48:24 -05:00
committed by GitHub
parent 90d5e9b7af
commit 6afb6e65a6
5 changed files with 99 additions and 41 deletions

View File

@@ -3,7 +3,7 @@ update_engines_on_launch: true
max_content_path: 100000000 max_content_path: 100000000
server_log_level: info server_log_level: info
log_buffer_length: 250 log_buffer_length: 250
subjob_connection_timeout: 120 worker_process_timeout: 120
flask_log_level: error flask_log_level: error
flask_debug_enable: false flask_debug_enable: false
queue_eval_seconds: 1 queue_eval_seconds: 1

View File

@@ -61,7 +61,7 @@ def jobs_json():
job_cache_token = num_to_alphanumeric(job_cache_int) job_cache_token = num_to_alphanumeric(job_cache_int)
return {'jobs': all_jobs, 'token': job_cache_token} return {'jobs': all_jobs, 'token': job_cache_token}
except Exception as e: except Exception as e:
logger.exception(f"Exception fetching jobs_json: {e}") logger.error(f"Error fetching jobs_json: {e}")
return {}, 500 return {}, 500
@@ -79,7 +79,7 @@ def long_polling_jobs():
return {}, 204 return {}, 204
time.sleep(1) time.sleep(1)
except Exception as e: except Exception as e:
logger.exception(f"Exception fetching long_polling_jobs: {e}") logger.error(f"Error fetching long_polling_jobs: {e}")
return {}, 500 return {}, 500

View File

@@ -14,6 +14,7 @@ from src.api.preview_manager import PreviewManager
from src.api.server_proxy import RenderServerProxy from src.api.server_proxy import RenderServerProxy
from src.engines.engine_manager import EngineManager from src.engines.engine_manager import EngineManager
from src.render_queue import RenderQueue from src.render_queue import RenderQueue
from src.utilities.config import Config
from src.utilities.misc_helper import get_file_size_human from src.utilities.misc_helper import get_file_size_human
from src.utilities.status_utils import RenderStatus, string_to_status from src.utilities.status_utils import RenderStatus, string_to_status
from src.utilities.zeroconf_server import ZeroconfServer from src.utilities.zeroconf_server import ZeroconfServer
@@ -135,8 +136,7 @@ class DistributedJobManager:
""" """
Creates render jobs. Creates render jobs.
This method takes a list of job data, a local path to a loaded project, and a job directory. It creates a render This method job data and a local path to a loaded project. It creates and returns new a render job.
job for each job data in the list and appends the result to a list. The list of results is then returned.
Args: Args:
job_data (dict): Job data. job_data (dict): Job data.
@@ -172,6 +172,7 @@ class DistributedJobManager:
worker.priority = int(job_data.get('priority', worker.priority)) worker.priority = int(job_data.get('priority', worker.priority))
worker.start_frame = int(job_data.get("start_frame", worker.start_frame)) worker.start_frame = int(job_data.get("start_frame", worker.start_frame))
worker.end_frame = int(job_data.get("end_frame", worker.end_frame)) worker.end_frame = int(job_data.get("end_frame", worker.end_frame))
worker.watchdog_timeout = Config.worker_process_timeout
worker.hostname = socket.gethostname() worker.hostname = socket.gethostname()
# determine if we can / should split the job # determine if we can / should split the job

View File

@@ -5,6 +5,7 @@ import logging
import os import os
import subprocess import subprocess
import threading import threading
import time
from datetime import datetime from datetime import datetime
import psutil import psutil
@@ -94,10 +95,12 @@ class BaseRenderWorker(Base):
self.errors = [] self.errors = []
# Threads and processes # Threads and processes
self.__thread = threading.Thread(target=self.run, args=()) self.__thread = threading.Thread(target=self.__run, args=())
self.__thread.daemon = True self.__thread.daemon = True
self.__process = None self.__process = None
self.last_output = None self.last_output = None
self.__last_output_time = None
self.watchdog_timeout = 120
def __repr__(self): def __repr__(self):
return f"<{self.__class__.__name__}|{self.id}|{self.name}|{self.status}|{self.input_path}>" return f"<{self.__class__.__name__}|{self.id}|{self.name}|{self.status}|{self.input_path}>"
@@ -175,11 +178,12 @@ class BaseRenderWorker(Base):
self.status = RenderStatus.RUNNING self.status = RenderStatus.RUNNING
self.start_time = datetime.now() self.start_time = datetime.now()
logger.info(f'Starting {self.engine.name()} {self.renderer_version} Render for {self.input_path} | '
f'Frame Count: {self.total_frames}')
self.__thread.start() self.__thread.start()
def run(self): def __run(self):
logger.info(f'Starting {self.engine.name()} {self.renderer_version} Render for {self.input_path} | '
f'Frame Count: {self.total_frames}')
# Setup logging # Setup logging
log_dir = os.path.dirname(self.log_path()) log_dir = os.path.dirname(self.log_path())
os.makedirs(log_dir, exist_ok=True) os.makedirs(log_dir, exist_ok=True)
@@ -209,48 +213,42 @@ class BaseRenderWorker(Base):
logger.warning(f"Restarting render - Attempt #{failed_attempts + 1}") logger.warning(f"Restarting render - Attempt #{failed_attempts + 1}")
self.status = RenderStatus.RUNNING self.status = RenderStatus.RUNNING
# Start process and get updates return_code = self.__setup_and_run_process(f, subprocess_cmds)
self.__process = subprocess.Popen(subprocess_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=False)
for c in io.TextIOWrapper(self.__process.stdout, encoding="utf-8"): # or another encoding
f.write(c)
f.flush()
os.fsync(f.fileno())
self.last_output = c.strip()
self._parse_stdout(c.strip())
f.write('\n')
# Check return codes and process
return_code = self.__process.wait()
self.end_time = datetime.now() self.end_time = datetime.now()
if self.status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: # user cancelled message = f"{'=' * 50}\n\n{self.engine.name()} render ended with code {return_code} " \
f"after {self.time_elapsed()}\n\n"
f.write(message)
# Teardown
if self.status in [RenderStatus.CANCELLED, RenderStatus.ERROR]:
message = f"{self.engine.name()} render ended with status '{self.status}' " \ message = f"{self.engine.name()} render ended with status '{self.status}' " \
f"after {self.time_elapsed()}" f"after {self.time_elapsed()}"
f.write(message) f.write(message)
return return
# if file output hasn't increased, return as error, otherwise restart process. # if file output hasn't increased, return as error, otherwise restart process.
if len(self.file_list()) <= initial_file_count: file_count_has_increased = len(self.file_list()) > initial_file_count
err_msg = f"File count has not increased. Count is still {len(self.file_list())}" if (self.status == RenderStatus.RUNNING) and file_count_has_increased and not return_code:
f.write(f'Error: {err_msg}\n\n')
self.errors.append(err_msg)
self.status = RenderStatus.ERROR
# Handle completed - All else counts as failed attempt
if (self.status == RenderStatus.RUNNING) and not return_code:
message = (f"{'=' * 50}\n\n{self.engine.name()} render completed successfully in " message = (f"{'=' * 50}\n\n{self.engine.name()} render completed successfully in "
f"{self.time_elapsed()}\n") f"{self.time_elapsed()}\n")
f.write(message) f.write(message)
break break
# Handle non-zero return codes if return_code:
message = f"{'=' * 50}\n\n{self.engine.name()} render failed with code {return_code} " \ err_msg = f"{self.engine.name()} render failed with code {return_code}"
f"after {self.time_elapsed()}\n\n" logger.error(err_msg)
f.write(message) self.errors.append(err_msg)
self.errors.append(message)
# handle instances where renderer exits ok but doesnt generate files
if not return_code and not file_count_has_increased:
err_msg = (f"{self.engine.name()} render exited ok, but file count has not increased. "
f"Count is still {len(self.file_list())}")
f.write(f'Error: {err_msg}\n\n')
self.errors.append(err_msg)
# only count the attempt as failed if renderer creates no output - ignore error codes for now
if not file_count_has_increased:
failed_attempts += 1 failed_attempts += 1
if self.children: if self.children:
@@ -263,6 +261,65 @@ class BaseRenderWorker(Base):
self.status = RenderStatus.COMPLETED self.status = RenderStatus.COMPLETED
logger.info(f"Render {self.id}-{self.name} completed successfully after {self.time_elapsed()}") logger.info(f"Render {self.id}-{self.name} completed successfully after {self.time_elapsed()}")
def __setup_and_run_process(self, f, subprocess_cmds):
def watchdog():
logger.debug(f'Starting process watchdog for {self} with {self.watchdog_timeout}s timeout')
while self.__process.poll() is None:
time_since_last_update = time.time() - self.__last_output_time
if time_since_last_update > self.watchdog_timeout:
logger.error(f"Process for {self} terminated due to exceeding timeout ({self.watchdog_timeout}s)")
self.__process.kill()
break
# logger.debug(f'Watchdog for {self} - Time since last update: {time_since_last_update}')
time.sleep(1)
logger.debug(f'Stopping process watchdog for {self}')
return_code = -1
watchdog_thread = threading.Thread(target=watchdog)
watchdog_thread.daemon = True
try:
# Start process and get updates
self.__process = subprocess.Popen(subprocess_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=False)
# Start watchdog
self.__last_output_time = time.time()
watchdog_thread.start()
for c in io.TextIOWrapper(self.__process.stdout, encoding="utf-8"): # or another encoding
self.last_output = c.strip()
self.__last_output_time = time.time()
try:
f.write(c)
f.flush()
os.fsync(f.fileno())
except Exception as e:
logger.error(f"Error saving log to disk: {e}")
try:
self._parse_stdout(c.strip())
except Exception as e:
logger.error(f'Error parsing stdout: {e}')
f.write('\n')
# Check return codes and process
return_code = self.__process.wait()
except Exception as e:
message = f'Uncaught error running render process: {e}'
f.write(message)
logger.exception(message)
self.__process.kill()
# let watchdog end before continuing - prevents multiple watchdogs running when process restarts
if watchdog_thread.is_alive():
watchdog_thread.join()
return return_code
def post_processing(self): def post_processing(self):
pass pass

View File

@@ -10,7 +10,7 @@ class Config:
max_content_path = 100000000 max_content_path = 100000000
server_log_level = 'debug' server_log_level = 'debug'
log_buffer_length = 250 log_buffer_length = 250
subjob_connection_timeout = 120 worker_process_timeout = 120
flask_log_level = 'error' flask_log_level = 'error'
flask_debug_enable = False flask_debug_enable = False
queue_eval_seconds = 1 queue_eval_seconds = 1
@@ -28,7 +28,7 @@ class Config:
cls.max_content_path = cfg.get('max_content_path', cls.max_content_path) cls.max_content_path = cfg.get('max_content_path', cls.max_content_path)
cls.server_log_level = cfg.get('server_log_level', cls.server_log_level) cls.server_log_level = cfg.get('server_log_level', cls.server_log_level)
cls.log_buffer_length = cfg.get('log_buffer_length', cls.log_buffer_length) cls.log_buffer_length = cfg.get('log_buffer_length', cls.log_buffer_length)
cls.subjob_connection_timeout = cfg.get('subjob_connection_timeout', cls.subjob_connection_timeout) cls.worker_process_timeout = cfg.get('worker_process_timeout', cls.worker_process_timeout)
cls.flask_log_level = cfg.get('flask_log_level', cls.flask_log_level) cls.flask_log_level = cfg.get('flask_log_level', cls.flask_log_level)
cls.flask_debug_enable = cfg.get('flask_debug_enable', cls.flask_debug_enable) cls.flask_debug_enable = cfg.get('flask_debug_enable', cls.flask_debug_enable)
cls.queue_eval_seconds = cfg.get('queue_eval_seconds', cls.queue_eval_seconds) cls.queue_eval_seconds = cfg.get('queue_eval_seconds', cls.queue_eval_seconds)