diff --git a/config/config.yaml b/config/config.yaml index 94469b4..2c2ff3c 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -3,7 +3,7 @@ update_engines_on_launch: true max_content_path: 100000000 server_log_level: info log_buffer_length: 250 -subjob_connection_timeout: 120 +worker_process_timeout: 120 flask_log_level: error flask_debug_enable: false queue_eval_seconds: 1 diff --git a/src/api/api_server.py b/src/api/api_server.py index a2ef762..781b575 100755 --- a/src/api/api_server.py +++ b/src/api/api_server.py @@ -61,7 +61,7 @@ def jobs_json(): job_cache_token = num_to_alphanumeric(job_cache_int) return {'jobs': all_jobs, 'token': job_cache_token} except Exception as e: - logger.exception(f"Exception fetching jobs_json: {e}") + logger.error(f"Error fetching jobs_json: {e}") return {}, 500 @@ -79,7 +79,7 @@ def long_polling_jobs(): return {}, 204 time.sleep(1) except Exception as e: - logger.exception(f"Exception fetching long_polling_jobs: {e}") + logger.error(f"Error fetching long_polling_jobs: {e}") return {}, 500 diff --git a/src/distributed_job_manager.py b/src/distributed_job_manager.py index c67ef40..d01a5af 100644 --- a/src/distributed_job_manager.py +++ b/src/distributed_job_manager.py @@ -14,6 +14,7 @@ from src.api.preview_manager import PreviewManager from src.api.server_proxy import RenderServerProxy from src.engines.engine_manager import EngineManager from src.render_queue import RenderQueue +from src.utilities.config import Config from src.utilities.misc_helper import get_file_size_human from src.utilities.status_utils import RenderStatus, string_to_status from src.utilities.zeroconf_server import ZeroconfServer @@ -135,8 +136,7 @@ class DistributedJobManager: """ Creates render jobs. - This method takes a list of job data, a local path to a loaded project, and a job directory. It creates a render - job for each job data in the list and appends the result to a list. The list of results is then returned. + This method job data and a local path to a loaded project. It creates and returns new a render job. Args: job_data (dict): Job data. @@ -172,6 +172,7 @@ class DistributedJobManager: worker.priority = int(job_data.get('priority', worker.priority)) worker.start_frame = int(job_data.get("start_frame", worker.start_frame)) worker.end_frame = int(job_data.get("end_frame", worker.end_frame)) + worker.watchdog_timeout = Config.worker_process_timeout worker.hostname = socket.gethostname() # determine if we can / should split the job diff --git a/src/engines/core/base_worker.py b/src/engines/core/base_worker.py index ee3e00c..e64f349 100644 --- a/src/engines/core/base_worker.py +++ b/src/engines/core/base_worker.py @@ -5,6 +5,7 @@ import logging import os import subprocess import threading +import time from datetime import datetime import psutil @@ -94,10 +95,12 @@ class BaseRenderWorker(Base): self.errors = [] # Threads and processes - self.__thread = threading.Thread(target=self.run, args=()) + self.__thread = threading.Thread(target=self.__run, args=()) self.__thread.daemon = True self.__process = None self.last_output = None + self.__last_output_time = None + self.watchdog_timeout = 120 def __repr__(self): return f"<{self.__class__.__name__}|{self.id}|{self.name}|{self.status}|{self.input_path}>" @@ -175,11 +178,12 @@ class BaseRenderWorker(Base): self.status = RenderStatus.RUNNING self.start_time = datetime.now() - logger.info(f'Starting {self.engine.name()} {self.renderer_version} Render for {self.input_path} | ' - f'Frame Count: {self.total_frames}') self.__thread.start() - def run(self): + def __run(self): + logger.info(f'Starting {self.engine.name()} {self.renderer_version} Render for {self.input_path} | ' + f'Frame Count: {self.total_frames}') + # Setup logging log_dir = os.path.dirname(self.log_path()) os.makedirs(log_dir, exist_ok=True) @@ -209,49 +213,43 @@ class BaseRenderWorker(Base): logger.warning(f"Restarting render - Attempt #{failed_attempts + 1}") self.status = RenderStatus.RUNNING - # Start process and get updates - self.__process = subprocess.Popen(subprocess_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, - universal_newlines=False) - - for c in io.TextIOWrapper(self.__process.stdout, encoding="utf-8"): # or another encoding - f.write(c) - f.flush() - os.fsync(f.fileno()) - self.last_output = c.strip() - self._parse_stdout(c.strip()) - - f.write('\n') - - # Check return codes and process - return_code = self.__process.wait() + return_code = self.__setup_and_run_process(f, subprocess_cmds) self.end_time = datetime.now() - if self.status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: # user cancelled + message = f"{'=' * 50}\n\n{self.engine.name()} render ended with code {return_code} " \ + f"after {self.time_elapsed()}\n\n" + f.write(message) + + # Teardown + if self.status in [RenderStatus.CANCELLED, RenderStatus.ERROR]: message = f"{self.engine.name()} render ended with status '{self.status}' " \ f"after {self.time_elapsed()}" f.write(message) return # if file output hasn't increased, return as error, otherwise restart process. - if len(self.file_list()) <= initial_file_count: - err_msg = f"File count has not increased. Count is still {len(self.file_list())}" - f.write(f'Error: {err_msg}\n\n') - self.errors.append(err_msg) - self.status = RenderStatus.ERROR - - # Handle completed - All else counts as failed attempt - if (self.status == RenderStatus.RUNNING) and not return_code: + file_count_has_increased = len(self.file_list()) > initial_file_count + if (self.status == RenderStatus.RUNNING) and file_count_has_increased and not return_code: message = (f"{'=' * 50}\n\n{self.engine.name()} render completed successfully in " f"{self.time_elapsed()}\n") f.write(message) break - # Handle non-zero return codes - message = f"{'=' * 50}\n\n{self.engine.name()} render failed with code {return_code} " \ - f"after {self.time_elapsed()}\n\n" - f.write(message) - self.errors.append(message) - failed_attempts += 1 + if return_code: + err_msg = f"{self.engine.name()} render failed with code {return_code}" + logger.error(err_msg) + self.errors.append(err_msg) + + # handle instances where renderer exits ok but doesnt generate files + if not return_code and not file_count_has_increased: + err_msg = (f"{self.engine.name()} render exited ok, but file count has not increased. " + f"Count is still {len(self.file_list())}") + f.write(f'Error: {err_msg}\n\n') + self.errors.append(err_msg) + + # only count the attempt as failed if renderer creates no output - ignore error codes for now + if not file_count_has_increased: + failed_attempts += 1 if self.children: from src.distributed_job_manager import DistributedJobManager @@ -263,6 +261,65 @@ class BaseRenderWorker(Base): self.status = RenderStatus.COMPLETED logger.info(f"Render {self.id}-{self.name} completed successfully after {self.time_elapsed()}") + def __setup_and_run_process(self, f, subprocess_cmds): + + def watchdog(): + logger.debug(f'Starting process watchdog for {self} with {self.watchdog_timeout}s timeout') + while self.__process.poll() is None: + time_since_last_update = time.time() - self.__last_output_time + if time_since_last_update > self.watchdog_timeout: + logger.error(f"Process for {self} terminated due to exceeding timeout ({self.watchdog_timeout}s)") + self.__process.kill() + break + # logger.debug(f'Watchdog for {self} - Time since last update: {time_since_last_update}') + time.sleep(1) + + logger.debug(f'Stopping process watchdog for {self}') + + return_code = -1 + watchdog_thread = threading.Thread(target=watchdog) + watchdog_thread.daemon = True + + try: + # Start process and get updates + self.__process = subprocess.Popen(subprocess_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + universal_newlines=False) + + # Start watchdog + self.__last_output_time = time.time() + watchdog_thread.start() + + for c in io.TextIOWrapper(self.__process.stdout, encoding="utf-8"): # or another encoding + self.last_output = c.strip() + self.__last_output_time = time.time() + try: + f.write(c) + f.flush() + os.fsync(f.fileno()) + except Exception as e: + logger.error(f"Error saving log to disk: {e}") + + try: + self._parse_stdout(c.strip()) + except Exception as e: + logger.error(f'Error parsing stdout: {e}') + + f.write('\n') + + # Check return codes and process + return_code = self.__process.wait() + except Exception as e: + message = f'Uncaught error running render process: {e}' + f.write(message) + logger.exception(message) + self.__process.kill() + + # let watchdog end before continuing - prevents multiple watchdogs running when process restarts + if watchdog_thread.is_alive(): + watchdog_thread.join() + + return return_code + def post_processing(self): pass diff --git a/src/utilities/config.py b/src/utilities/config.py index 1fb435a..e6925c4 100644 --- a/src/utilities/config.py +++ b/src/utilities/config.py @@ -10,7 +10,7 @@ class Config: max_content_path = 100000000 server_log_level = 'debug' log_buffer_length = 250 - subjob_connection_timeout = 120 + worker_process_timeout = 120 flask_log_level = 'error' flask_debug_enable = False queue_eval_seconds = 1 @@ -28,7 +28,7 @@ class Config: cls.max_content_path = cfg.get('max_content_path', cls.max_content_path) cls.server_log_level = cfg.get('server_log_level', cls.server_log_level) cls.log_buffer_length = cfg.get('log_buffer_length', cls.log_buffer_length) - cls.subjob_connection_timeout = cfg.get('subjob_connection_timeout', cls.subjob_connection_timeout) + cls.worker_process_timeout = cfg.get('worker_process_timeout', cls.worker_process_timeout) cls.flask_log_level = cfg.get('flask_log_level', cls.flask_log_level) cls.flask_debug_enable = cfg.get('flask_debug_enable', cls.flask_debug_enable) cls.queue_eval_seconds = cfg.get('queue_eval_seconds', cls.queue_eval_seconds)