From 21de69ca4f523fe7c8fc55a07ab45a6dc004efaf Mon Sep 17 00:00:00 2001 From: Brett Date: Sat, 3 Aug 2024 11:02:40 -0500 Subject: [PATCH] Improve performance on several API calls (#80) * Streamline fetching renderer_info from API - use threading for performance improvements * Use concurrent.futures instead of Threading * Fix timeout issue with server proxy * Minor fixes to code that handles proxy server online / offline status --- requirements.txt | 1 + src/api/api_server.py | 52 +++++++++++++++++----------- src/api/server_proxy.py | 19 ++++++---- src/api/serverproxy_manager.py | 1 + src/engines/engine_manager.py | 63 ++++++++++++++++++++++------------ src/render_queue.py | 3 -- 6 files changed, 88 insertions(+), 51 deletions(-) diff --git a/requirements.txt b/requirements.txt index efec5e2..e45160c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,6 +19,7 @@ setuptools>=69.0.3 pandas>=2.2.0 matplotlib>=3.8.2 MarkupSafe>=2.1.4 +dmglib>=0.9.5 python-dateutil>=2.8.2 certifi>=2023.11.17 shiboken6>=6.6.1 diff --git a/src/api/api_server.py b/src/api/api_server.py index 8c9c418..f125355 100755 --- a/src/api/api_server.py +++ b/src/api/api_server.py @@ -1,4 +1,5 @@ #!/usr/bin/env python3 +import concurrent.futures import json import logging import multiprocessing @@ -341,13 +342,6 @@ def clear_history(): @server.route('/api/status') def status(): - renderer_data = {} - for render_class in EngineManager.supported_engines(): - if EngineManager.all_versions_for_engine(render_class.name()): # only return renderers installed on host - renderer_data[render_class.name()] = \ - {'versions': EngineManager.all_versions_for_engine(render_class.name()), - 'is_available': RenderQueue.is_available_for_job(render_class.name()) - } # Get system info return {"timestamp": datetime.now().isoformat(), @@ -361,7 +355,6 @@ def status(): "memory_available": psutil.virtual_memory().available, "memory_percent": psutil.virtual_memory().percent, "job_counts": RenderQueue.job_counts(), - "renderers": renderer_data, "hostname": server.config['HOSTNAME'], "port": server.config['PORT'] } @@ -369,19 +362,38 @@ def status(): @server.get('/api/renderer_info') def renderer_info(): + + def process_engine(engine): + try: + # Get all installed versions of the engine + installed_versions = EngineManager.all_versions_for_engine(engine.name()) + if installed_versions: + # Use system-installed versions to avoid permission issues + system_installed_versions = [x for x in installed_versions if x['type'] == 'system'] + install_path = system_installed_versions[0]['path'] if system_installed_versions else \ + installed_versions[0]['path'] + + return { + engine.name(): { + 'is_available': RenderQueue.is_available_for_job(engine.name()), + 'versions': installed_versions, + 'supported_extensions': engine.supported_extensions(), + 'supported_export_formats': engine(install_path).get_output_formats() + } + } + except Exception as e: + logger.error(f'Error fetching details for {engine.name()} renderer: {e}') + return {} + renderer_data = {} - for engine in EngineManager.supported_engines(): - # Get all installed versions of engine - installed_versions = EngineManager.all_versions_for_engine(engine.name()) - if installed_versions: - # fixme: using system versions only because downloaded versions may have permissions issues - system_installed_versions = [x for x in installed_versions if x['type'] == 'system'] - install_path = system_installed_versions[0]['path'] if system_installed_versions \ - else (installed_versions)[0]['path'] - renderer_data[engine.name()] = {'is_available': RenderQueue.is_available_for_job(engine.name()), - 'versions': installed_versions, - 'supported_extensions': engine.supported_extensions(), - 'supported_export_formats': engine(install_path).get_output_formats()} + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = {executor.submit(process_engine, engine): engine.name() for engine in EngineManager.supported_engines()} + + for future in concurrent.futures.as_completed(futures): + result = future.result() + if result: + renderer_data.update(result) + return renderer_data diff --git a/src/api/server_proxy.py b/src/api/server_proxy.py index 5f82431..4160dd9 100644 --- a/src/api/server_proxy.py +++ b/src/api/server_proxy.py @@ -65,7 +65,7 @@ class RenderServerProxy: if self.__update_in_background: return self.__offline_flags < OFFLINE_MAX else: - return self.connect() is not None + return self.get_status() is not None def status(self): if not self.is_online(): @@ -76,8 +76,9 @@ class RenderServerProxy: def request_data(self, payload, timeout=5): try: req = self.request(payload, timeout) - if req.ok and req.status_code == 200: + if req.ok: self.__offline_flags = 0 + if req.status_code == 200: return req.json() except json.JSONDecodeError as e: logger.debug(f"JSON decode error: {e}") @@ -90,10 +91,10 @@ class RenderServerProxy: except Exception as e: logger.exception(f"Uncaught exception: {e}") - # If server unexpectedly drops off the network, remove from Zeroconf list + # If server unexpectedly drops off the network, stop background updates if self.__offline_flags > OFFLINE_MAX: try: - ZeroconfServer.client_cache.pop(self.hostname) + self.stop_background_update() except KeyError: pass return None @@ -108,9 +109,11 @@ class RenderServerProxy: self.__update_in_background = True def thread_worker(): + logger.debug(f'Starting background updates for {self.hostname}') while self.__update_in_background: self.__update_job_cache() time.sleep(self.update_cadence) + logger.debug(f'Stopping background updates for {self.hostname}') self.__background_thread = threading.Thread(target=thread_worker) self.__background_thread.daemon = True @@ -127,7 +130,11 @@ class RenderServerProxy: self.__update_job_cache(timeout, ignore_token) return self.__jobs_cache.copy() if self.__jobs_cache else None - def __update_job_cache(self, timeout=30, ignore_token=False): + def __update_job_cache(self, timeout=40, ignore_token=False): + + if self.__offline_flags: # if we're offline, don't bother with the long poll + ignore_token = True + url = f'jobs_long_poll?token={self.__jobs_cache_token}' if (self.__jobs_cache_token and not ignore_token) else 'jobs' status_result = self.request_data(url, timeout=timeout) @@ -151,7 +158,7 @@ class RenderServerProxy: def get_status(self): status = self.request_data('status') - if not self.system_cpu: + if status and not self.system_cpu: self.system_cpu = status['system_cpu'] self.system_cpu_count = status['cpu_count'] self.system_os = status['system_os'] diff --git a/src/api/serverproxy_manager.py b/src/api/serverproxy_manager.py index 7e94a8c..37bf1d9 100644 --- a/src/api/serverproxy_manager.py +++ b/src/api/serverproxy_manager.py @@ -21,6 +21,7 @@ class ServerProxyManager: if state_change == ServiceStateChange.Added or state_change == ServiceStateChange.Updated: cls.get_proxy_for_hostname(hostname) else: + cls.get_proxy_for_hostname(hostname).stop_background_update() cls.server_proxys.pop(hostname) @classmethod diff --git a/src/engines/engine_manager.py b/src/engines/engine_manager.py index de7ba49..41a1fbc 100644 --- a/src/engines/engine_manager.py +++ b/src/engines/engine_manager.py @@ -2,6 +2,7 @@ import logging import os import shutil import threading +import concurrent.futures from src.engines.blender.blender_engine import Blender from src.engines.ffmpeg.ffmpeg_engine import FFMPEG @@ -26,7 +27,7 @@ class EngineManager: return obj @classmethod - def all_engines(cls): + def all_engines(cls, filter_name=None): if not cls.engines_path: raise FileNotFoundError("Engines path must be set before requesting downloads") @@ -36,47 +37,65 @@ class EngineManager: try: all_items = os.listdir(cls.engines_path) all_directories = [item for item in all_items if os.path.isdir(os.path.join(cls.engines_path, item))] + keys = ["engine", "version", "system_os", "cpu"] # Define keys for result dictionary for directory in all_directories: - # Split the input string by dashes to get segments + # Split directory name into segments segments = directory.split('-') - - # Create a dictionary with named keys - keys = ["engine", "version", "system_os", "cpu"] + # Create a dictionary mapping keys to corresponding segments result_dict = {keys[i]: segments[i] for i in range(min(len(keys), len(segments)))} result_dict['type'] = 'managed' - # Figure out the binary name for the path + # Initialize binary_name with engine name binary_name = result_dict['engine'].lower() + # Determine the correct binary name based on the engine and system_os for eng in cls.supported_engines(): if eng.name().lower() == result_dict['engine']: binary_name = eng.binary_names.get(result_dict['system_os'], binary_name) - - # Find path to binary - path = None - for root, _, files in os.walk(system_safe_path(os.path.join(cls.engines_path, directory))): - if binary_name in files: - path = os.path.join(root, binary_name) break + # Find the path to the binary file + path = next( + (os.path.join(root, binary_name) for root, _, files in + os.walk(system_safe_path(os.path.join(cls.engines_path, directory))) if binary_name in files), + None + ) + result_dict['path'] = path - results.append(result_dict) + # Add the result dictionary to results if it matches the filter_name or if no filter is applied + if not filter_name or filter_name == result_dict['engine']: + results.append(result_dict) except FileNotFoundError as e: logger.warning(f"Cannot find local engines download directory: {e}") - # add system installs to this list - for eng in cls.supported_engines(): - if eng.default_renderer_path(): - results.append({'engine': eng.name(), 'version': eng().version(), - 'system_os': current_system_os(), - 'cpu': current_system_cpu(), - 'path': eng.default_renderer_path(), 'type': 'system'}) + # add system installs to this list - use bg thread because it can be slow + def fetch_engine_details(eng): + return { + 'engine': eng.name(), + 'version': eng().version(), + 'system_os': current_system_os(), + 'cpu': current_system_cpu(), + 'path': eng.default_renderer_path(), + 'type': 'system' + } + + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = { + executor.submit(fetch_engine_details, eng): eng.name() + for eng in cls.supported_engines() + if eng.default_renderer_path() and (not filter_name or filter_name == eng.name()) + } + + for future in concurrent.futures.as_completed(futures): + result = future.result() + if result: + results.append(result) return results @classmethod - def all_versions_for_engine(cls, engine): - versions = [x for x in cls.all_engines() if x['engine'] == engine] + def all_versions_for_engine(cls, engine_name): + versions = cls.all_engines(filter_name=engine_name) sorted_versions = sorted(versions, key=lambda x: x['version'], reverse=True) return sorted_versions diff --git a/src/render_queue.py b/src/render_queue.py index 884b7a3..23170c6 100755 --- a/src/render_queue.py +++ b/src/render_queue.py @@ -97,9 +97,6 @@ class RenderQueue: @classmethod def is_available_for_job(cls, renderer, priority=2): - if not EngineManager.all_versions_for_engine(renderer): - return False - instances = cls.renderer_instances() higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < priority] max_allowed_instances = cls.maximum_renderer_instances.get(renderer, 1)