mirror of
https://github.com/blw1138/Zordon.git
synced 2025-12-17 08:48:13 +00:00
Improve performance on several API calls (#80)
* Streamline fetching renderer_info from API - use threading for performance improvements * Use concurrent.futures instead of Threading * Fix timeout issue with server proxy * Minor fixes to code that handles proxy server online / offline status
This commit is contained in:
@@ -19,6 +19,7 @@ setuptools>=69.0.3
|
|||||||
pandas>=2.2.0
|
pandas>=2.2.0
|
||||||
matplotlib>=3.8.2
|
matplotlib>=3.8.2
|
||||||
MarkupSafe>=2.1.4
|
MarkupSafe>=2.1.4
|
||||||
|
dmglib>=0.9.5
|
||||||
python-dateutil>=2.8.2
|
python-dateutil>=2.8.2
|
||||||
certifi>=2023.11.17
|
certifi>=2023.11.17
|
||||||
shiboken6>=6.6.1
|
shiboken6>=6.6.1
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
import concurrent.futures
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import multiprocessing
|
import multiprocessing
|
||||||
@@ -341,13 +342,6 @@ def clear_history():
|
|||||||
|
|
||||||
@server.route('/api/status')
|
@server.route('/api/status')
|
||||||
def status():
|
def status():
|
||||||
renderer_data = {}
|
|
||||||
for render_class in EngineManager.supported_engines():
|
|
||||||
if EngineManager.all_versions_for_engine(render_class.name()): # only return renderers installed on host
|
|
||||||
renderer_data[render_class.name()] = \
|
|
||||||
{'versions': EngineManager.all_versions_for_engine(render_class.name()),
|
|
||||||
'is_available': RenderQueue.is_available_for_job(render_class.name())
|
|
||||||
}
|
|
||||||
|
|
||||||
# Get system info
|
# Get system info
|
||||||
return {"timestamp": datetime.now().isoformat(),
|
return {"timestamp": datetime.now().isoformat(),
|
||||||
@@ -361,7 +355,6 @@ def status():
|
|||||||
"memory_available": psutil.virtual_memory().available,
|
"memory_available": psutil.virtual_memory().available,
|
||||||
"memory_percent": psutil.virtual_memory().percent,
|
"memory_percent": psutil.virtual_memory().percent,
|
||||||
"job_counts": RenderQueue.job_counts(),
|
"job_counts": RenderQueue.job_counts(),
|
||||||
"renderers": renderer_data,
|
|
||||||
"hostname": server.config['HOSTNAME'],
|
"hostname": server.config['HOSTNAME'],
|
||||||
"port": server.config['PORT']
|
"port": server.config['PORT']
|
||||||
}
|
}
|
||||||
@@ -369,19 +362,38 @@ def status():
|
|||||||
|
|
||||||
@server.get('/api/renderer_info')
|
@server.get('/api/renderer_info')
|
||||||
def renderer_info():
|
def renderer_info():
|
||||||
|
|
||||||
|
def process_engine(engine):
|
||||||
|
try:
|
||||||
|
# Get all installed versions of the engine
|
||||||
|
installed_versions = EngineManager.all_versions_for_engine(engine.name())
|
||||||
|
if installed_versions:
|
||||||
|
# Use system-installed versions to avoid permission issues
|
||||||
|
system_installed_versions = [x for x in installed_versions if x['type'] == 'system']
|
||||||
|
install_path = system_installed_versions[0]['path'] if system_installed_versions else \
|
||||||
|
installed_versions[0]['path']
|
||||||
|
|
||||||
|
return {
|
||||||
|
engine.name(): {
|
||||||
|
'is_available': RenderQueue.is_available_for_job(engine.name()),
|
||||||
|
'versions': installed_versions,
|
||||||
|
'supported_extensions': engine.supported_extensions(),
|
||||||
|
'supported_export_formats': engine(install_path).get_output_formats()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f'Error fetching details for {engine.name()} renderer: {e}')
|
||||||
|
return {}
|
||||||
|
|
||||||
renderer_data = {}
|
renderer_data = {}
|
||||||
for engine in EngineManager.supported_engines():
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||||
# Get all installed versions of engine
|
futures = {executor.submit(process_engine, engine): engine.name() for engine in EngineManager.supported_engines()}
|
||||||
installed_versions = EngineManager.all_versions_for_engine(engine.name())
|
|
||||||
if installed_versions:
|
for future in concurrent.futures.as_completed(futures):
|
||||||
# fixme: using system versions only because downloaded versions may have permissions issues
|
result = future.result()
|
||||||
system_installed_versions = [x for x in installed_versions if x['type'] == 'system']
|
if result:
|
||||||
install_path = system_installed_versions[0]['path'] if system_installed_versions \
|
renderer_data.update(result)
|
||||||
else (installed_versions)[0]['path']
|
|
||||||
renderer_data[engine.name()] = {'is_available': RenderQueue.is_available_for_job(engine.name()),
|
|
||||||
'versions': installed_versions,
|
|
||||||
'supported_extensions': engine.supported_extensions(),
|
|
||||||
'supported_export_formats': engine(install_path).get_output_formats()}
|
|
||||||
return renderer_data
|
return renderer_data
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -65,7 +65,7 @@ class RenderServerProxy:
|
|||||||
if self.__update_in_background:
|
if self.__update_in_background:
|
||||||
return self.__offline_flags < OFFLINE_MAX
|
return self.__offline_flags < OFFLINE_MAX
|
||||||
else:
|
else:
|
||||||
return self.connect() is not None
|
return self.get_status() is not None
|
||||||
|
|
||||||
def status(self):
|
def status(self):
|
||||||
if not self.is_online():
|
if not self.is_online():
|
||||||
@@ -76,8 +76,9 @@ class RenderServerProxy:
|
|||||||
def request_data(self, payload, timeout=5):
|
def request_data(self, payload, timeout=5):
|
||||||
try:
|
try:
|
||||||
req = self.request(payload, timeout)
|
req = self.request(payload, timeout)
|
||||||
if req.ok and req.status_code == 200:
|
if req.ok:
|
||||||
self.__offline_flags = 0
|
self.__offline_flags = 0
|
||||||
|
if req.status_code == 200:
|
||||||
return req.json()
|
return req.json()
|
||||||
except json.JSONDecodeError as e:
|
except json.JSONDecodeError as e:
|
||||||
logger.debug(f"JSON decode error: {e}")
|
logger.debug(f"JSON decode error: {e}")
|
||||||
@@ -90,10 +91,10 @@ class RenderServerProxy:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.exception(f"Uncaught exception: {e}")
|
logger.exception(f"Uncaught exception: {e}")
|
||||||
|
|
||||||
# If server unexpectedly drops off the network, remove from Zeroconf list
|
# If server unexpectedly drops off the network, stop background updates
|
||||||
if self.__offline_flags > OFFLINE_MAX:
|
if self.__offline_flags > OFFLINE_MAX:
|
||||||
try:
|
try:
|
||||||
ZeroconfServer.client_cache.pop(self.hostname)
|
self.stop_background_update()
|
||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
return None
|
return None
|
||||||
@@ -108,9 +109,11 @@ class RenderServerProxy:
|
|||||||
self.__update_in_background = True
|
self.__update_in_background = True
|
||||||
|
|
||||||
def thread_worker():
|
def thread_worker():
|
||||||
|
logger.debug(f'Starting background updates for {self.hostname}')
|
||||||
while self.__update_in_background:
|
while self.__update_in_background:
|
||||||
self.__update_job_cache()
|
self.__update_job_cache()
|
||||||
time.sleep(self.update_cadence)
|
time.sleep(self.update_cadence)
|
||||||
|
logger.debug(f'Stopping background updates for {self.hostname}')
|
||||||
|
|
||||||
self.__background_thread = threading.Thread(target=thread_worker)
|
self.__background_thread = threading.Thread(target=thread_worker)
|
||||||
self.__background_thread.daemon = True
|
self.__background_thread.daemon = True
|
||||||
@@ -127,7 +130,11 @@ class RenderServerProxy:
|
|||||||
self.__update_job_cache(timeout, ignore_token)
|
self.__update_job_cache(timeout, ignore_token)
|
||||||
return self.__jobs_cache.copy() if self.__jobs_cache else None
|
return self.__jobs_cache.copy() if self.__jobs_cache else None
|
||||||
|
|
||||||
def __update_job_cache(self, timeout=30, ignore_token=False):
|
def __update_job_cache(self, timeout=40, ignore_token=False):
|
||||||
|
|
||||||
|
if self.__offline_flags: # if we're offline, don't bother with the long poll
|
||||||
|
ignore_token = True
|
||||||
|
|
||||||
url = f'jobs_long_poll?token={self.__jobs_cache_token}' if (self.__jobs_cache_token and
|
url = f'jobs_long_poll?token={self.__jobs_cache_token}' if (self.__jobs_cache_token and
|
||||||
not ignore_token) else 'jobs'
|
not ignore_token) else 'jobs'
|
||||||
status_result = self.request_data(url, timeout=timeout)
|
status_result = self.request_data(url, timeout=timeout)
|
||||||
@@ -151,7 +158,7 @@ class RenderServerProxy:
|
|||||||
|
|
||||||
def get_status(self):
|
def get_status(self):
|
||||||
status = self.request_data('status')
|
status = self.request_data('status')
|
||||||
if not self.system_cpu:
|
if status and not self.system_cpu:
|
||||||
self.system_cpu = status['system_cpu']
|
self.system_cpu = status['system_cpu']
|
||||||
self.system_cpu_count = status['cpu_count']
|
self.system_cpu_count = status['cpu_count']
|
||||||
self.system_os = status['system_os']
|
self.system_os = status['system_os']
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ class ServerProxyManager:
|
|||||||
if state_change == ServiceStateChange.Added or state_change == ServiceStateChange.Updated:
|
if state_change == ServiceStateChange.Added or state_change == ServiceStateChange.Updated:
|
||||||
cls.get_proxy_for_hostname(hostname)
|
cls.get_proxy_for_hostname(hostname)
|
||||||
else:
|
else:
|
||||||
|
cls.get_proxy_for_hostname(hostname).stop_background_update()
|
||||||
cls.server_proxys.pop(hostname)
|
cls.server_proxys.pop(hostname)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ import logging
|
|||||||
import os
|
import os
|
||||||
import shutil
|
import shutil
|
||||||
import threading
|
import threading
|
||||||
|
import concurrent.futures
|
||||||
|
|
||||||
from src.engines.blender.blender_engine import Blender
|
from src.engines.blender.blender_engine import Blender
|
||||||
from src.engines.ffmpeg.ffmpeg_engine import FFMPEG
|
from src.engines.ffmpeg.ffmpeg_engine import FFMPEG
|
||||||
@@ -26,7 +27,7 @@ class EngineManager:
|
|||||||
return obj
|
return obj
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def all_engines(cls):
|
def all_engines(cls, filter_name=None):
|
||||||
|
|
||||||
if not cls.engines_path:
|
if not cls.engines_path:
|
||||||
raise FileNotFoundError("Engines path must be set before requesting downloads")
|
raise FileNotFoundError("Engines path must be set before requesting downloads")
|
||||||
@@ -36,47 +37,65 @@ class EngineManager:
|
|||||||
try:
|
try:
|
||||||
all_items = os.listdir(cls.engines_path)
|
all_items = os.listdir(cls.engines_path)
|
||||||
all_directories = [item for item in all_items if os.path.isdir(os.path.join(cls.engines_path, item))]
|
all_directories = [item for item in all_items if os.path.isdir(os.path.join(cls.engines_path, item))]
|
||||||
|
keys = ["engine", "version", "system_os", "cpu"] # Define keys for result dictionary
|
||||||
|
|
||||||
for directory in all_directories:
|
for directory in all_directories:
|
||||||
# Split the input string by dashes to get segments
|
# Split directory name into segments
|
||||||
segments = directory.split('-')
|
segments = directory.split('-')
|
||||||
|
# Create a dictionary mapping keys to corresponding segments
|
||||||
# Create a dictionary with named keys
|
|
||||||
keys = ["engine", "version", "system_os", "cpu"]
|
|
||||||
result_dict = {keys[i]: segments[i] for i in range(min(len(keys), len(segments)))}
|
result_dict = {keys[i]: segments[i] for i in range(min(len(keys), len(segments)))}
|
||||||
result_dict['type'] = 'managed'
|
result_dict['type'] = 'managed'
|
||||||
|
|
||||||
# Figure out the binary name for the path
|
# Initialize binary_name with engine name
|
||||||
binary_name = result_dict['engine'].lower()
|
binary_name = result_dict['engine'].lower()
|
||||||
|
# Determine the correct binary name based on the engine and system_os
|
||||||
for eng in cls.supported_engines():
|
for eng in cls.supported_engines():
|
||||||
if eng.name().lower() == result_dict['engine']:
|
if eng.name().lower() == result_dict['engine']:
|
||||||
binary_name = eng.binary_names.get(result_dict['system_os'], binary_name)
|
binary_name = eng.binary_names.get(result_dict['system_os'], binary_name)
|
||||||
|
|
||||||
# Find path to binary
|
|
||||||
path = None
|
|
||||||
for root, _, files in os.walk(system_safe_path(os.path.join(cls.engines_path, directory))):
|
|
||||||
if binary_name in files:
|
|
||||||
path = os.path.join(root, binary_name)
|
|
||||||
break
|
break
|
||||||
|
|
||||||
|
# Find the path to the binary file
|
||||||
|
path = next(
|
||||||
|
(os.path.join(root, binary_name) for root, _, files in
|
||||||
|
os.walk(system_safe_path(os.path.join(cls.engines_path, directory))) if binary_name in files),
|
||||||
|
None
|
||||||
|
)
|
||||||
|
|
||||||
result_dict['path'] = path
|
result_dict['path'] = path
|
||||||
results.append(result_dict)
|
# Add the result dictionary to results if it matches the filter_name or if no filter is applied
|
||||||
|
if not filter_name or filter_name == result_dict['engine']:
|
||||||
|
results.append(result_dict)
|
||||||
except FileNotFoundError as e:
|
except FileNotFoundError as e:
|
||||||
logger.warning(f"Cannot find local engines download directory: {e}")
|
logger.warning(f"Cannot find local engines download directory: {e}")
|
||||||
|
|
||||||
# add system installs to this list
|
# add system installs to this list - use bg thread because it can be slow
|
||||||
for eng in cls.supported_engines():
|
def fetch_engine_details(eng):
|
||||||
if eng.default_renderer_path():
|
return {
|
||||||
results.append({'engine': eng.name(), 'version': eng().version(),
|
'engine': eng.name(),
|
||||||
'system_os': current_system_os(),
|
'version': eng().version(),
|
||||||
'cpu': current_system_cpu(),
|
'system_os': current_system_os(),
|
||||||
'path': eng.default_renderer_path(), 'type': 'system'})
|
'cpu': current_system_cpu(),
|
||||||
|
'path': eng.default_renderer_path(),
|
||||||
|
'type': 'system'
|
||||||
|
}
|
||||||
|
|
||||||
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||||
|
futures = {
|
||||||
|
executor.submit(fetch_engine_details, eng): eng.name()
|
||||||
|
for eng in cls.supported_engines()
|
||||||
|
if eng.default_renderer_path() and (not filter_name or filter_name == eng.name())
|
||||||
|
}
|
||||||
|
|
||||||
|
for future in concurrent.futures.as_completed(futures):
|
||||||
|
result = future.result()
|
||||||
|
if result:
|
||||||
|
results.append(result)
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def all_versions_for_engine(cls, engine):
|
def all_versions_for_engine(cls, engine_name):
|
||||||
versions = [x for x in cls.all_engines() if x['engine'] == engine]
|
versions = cls.all_engines(filter_name=engine_name)
|
||||||
sorted_versions = sorted(versions, key=lambda x: x['version'], reverse=True)
|
sorted_versions = sorted(versions, key=lambda x: x['version'], reverse=True)
|
||||||
return sorted_versions
|
return sorted_versions
|
||||||
|
|
||||||
|
|||||||
@@ -97,9 +97,6 @@ class RenderQueue:
|
|||||||
@classmethod
|
@classmethod
|
||||||
def is_available_for_job(cls, renderer, priority=2):
|
def is_available_for_job(cls, renderer, priority=2):
|
||||||
|
|
||||||
if not EngineManager.all_versions_for_engine(renderer):
|
|
||||||
return False
|
|
||||||
|
|
||||||
instances = cls.renderer_instances()
|
instances = cls.renderer_instances()
|
||||||
higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < priority]
|
higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < priority]
|
||||||
max_allowed_instances = cls.maximum_renderer_instances.get(renderer, 1)
|
max_allowed_instances = cls.maximum_renderer_instances.get(renderer, 1)
|
||||||
|
|||||||
Reference in New Issue
Block a user