From f01192909dbee2d66ddbe2e4f41d60c646b643c3 Mon Sep 17 00:00:00 2001 From: Brett Date: Wed, 25 Oct 2023 02:49:07 -0500 Subject: [PATCH] Split add job helper (#45) * Fix issue with engine not being available to download * Add version caches to ffmpeg downloader * Remove some test parameters * "releases" should be "release" in linux ffmpeg url * CPU was incorrectly reported as OS * Fix naming structure for FFMPEG downloads for linux * More linux ffmpeg work * Improved error handling * WIP * Consolidate platform reporting to not use platform directly * Fix missing folder name * Fix project output naming * Undo config.yaml commit * Add is_engine_available API call * Fix issue where subjobs would not find servers --- config/config.yaml | 3 +- src/api/add_job_helpers.py | 33 +++++---- src/api/api_server.py | 21 +++--- src/api/server_proxy.py | 3 + src/distributed_job_manager.py | 13 ++-- src/engines/blender/blender_downloader.py | 5 +- src/engines/engine_manager.py | 37 +++++----- src/engines/ffmpeg/ffmpeg_downloader.py | 84 ++++++++++++++++------- src/utilities/misc_helper.py | 7 +- 9 files changed, 130 insertions(+), 76 deletions(-) diff --git a/config/config.yaml b/config/config.yaml index 8a93c6d..4185d9f 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -6,4 +6,5 @@ flask_log_level: error flask_debug_enable: false queue_eval_seconds: 1 port_number: 8080 -enable_split_jobs: true \ No newline at end of file +enable_split_jobs: true +download_timeout_seconds: 120 \ No newline at end of file diff --git a/src/api/add_job_helpers.py b/src/api/add_job_helpers.py index 4edb011..28280b3 100644 --- a/src/api/add_job_helpers.py +++ b/src/api/add_job_helpers.py @@ -44,22 +44,22 @@ def handle_uploaded_project_files(request, jobs_list, upload_directory): job_dir = os.path.join(upload_directory, '-'.join( [datetime.now().strftime("%Y.%m.%d_%H.%M.%S"), renderer, cleaned_path_name])) os.makedirs(job_dir, exist_ok=True) - upload_dir = os.path.join(job_dir, 'source') - os.makedirs(upload_dir, exist_ok=True) + project_source_dir = os.path.join(job_dir, 'source') + os.makedirs(project_source_dir, exist_ok=True) # Move projects to their work directories if uploaded_project and uploaded_project.filename: - loaded_project_local_path = os.path.join(upload_dir, secure_filename(uploaded_project.filename)) + loaded_project_local_path = os.path.join(project_source_dir, secure_filename(uploaded_project.filename)) uploaded_project.save(loaded_project_local_path) - logger.info(f"Transfer complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}") + logger.info(f"Transfer complete for {loaded_project_local_path.split(upload_directory)[-1]}") elif project_url: - loaded_project_local_path = os.path.join(upload_dir, referred_name) + loaded_project_local_path = os.path.join(project_source_dir, referred_name) shutil.move(downloaded_file_url, loaded_project_local_path) - logger.info(f"Download complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}") + logger.info(f"Download complete for {loaded_project_local_path.split(upload_directory)[-1]}") elif local_path: - loaded_project_local_path = os.path.join(upload_dir, referred_name) + loaded_project_local_path = os.path.join(project_source_dir, referred_name) shutil.copy(local_path, loaded_project_local_path) - logger.info(f"Import complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}") + logger.info(f"Import complete for {loaded_project_local_path.split(upload_directory)[-1]}") return loaded_project_local_path, referred_name @@ -137,14 +137,21 @@ def create_render_jobs(jobs_list, loaded_project_local_path, job_dir, enable_spl os.makedirs(output_dir, exist_ok=True) # get new output path in output_dir - job_data['output_path'] = os.path.join(output_dir, os.path.basename( - job_data.get('output_path', None) or loaded_project_local_path - )) + output_path = job_data.get('output_path') + if not output_path: + loaded_project_filename = os.path.basename(loaded_project_local_path) + output_filename = os.path.splitext(loaded_project_filename)[0] + else: + output_filename = os.path.basename(output_path) + + output_path = os.path.join(os.path.dirname(os.path.dirname(loaded_project_local_path)), 'output', + output_filename) + logger.debug(f"New job output path: {output_path}") # create & configure jobs worker = RenderWorkerFactory.create_worker(renderer=job_data['renderer'], input_path=loaded_project_local_path, - output_path=job_data["output_path"], + output_path=output_path, engine_version=job_data.get('engine_version'), args=job_data.get('args', {})) worker.status = job_data.get("initial_status", worker.status) @@ -157,6 +164,8 @@ def create_render_jobs(jobs_list, loaded_project_local_path, job_dir, enable_spl # determine if we can / should split the job if enable_split_jobs and (worker.total_frames > 1) and not worker.parent: DistributedJobManager.split_into_subjobs(worker, job_data, loaded_project_local_path) + else: + logger.debug("Not splitting into subjobs") RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False)) if not worker.parent: diff --git a/src/api/api_server.py b/src/api/api_server.py index 461cd6a..1eab8fb 100755 --- a/src/api/api_server.py +++ b/src/api/api_server.py @@ -3,7 +3,6 @@ import json import logging import os import pathlib -import platform import shutil import socket import ssl @@ -25,7 +24,7 @@ from src.engines.core.base_worker import string_to_status, RenderStatus from src.engines.core.worker_factory import RenderWorkerFactory from src.engines.engine_manager import EngineManager from src.render_queue import RenderQueue, JobNotFoundError -from src.utilities.misc_helper import system_safe_path +from src.utilities.misc_helper import system_safe_path, current_system_os, current_system_cpu, current_system_os_version from src.utilities.server_helper import generate_thumbnail_for_job from src.utilities.zeroconf_server import ZeroconfServer @@ -399,14 +398,10 @@ def status(): } # Get system info - system_platform = platform.system().lower().replace('darwin', 'macos') - system_platform_version = platform.mac_ver()[0] if system_platform == 'macos' else platform.release().lower() - system_cpu = platform.machine().lower().replace('amd64', 'x64') - return {"timestamp": datetime.now().isoformat(), - "system_platform": system_platform, - "system_platform_version": system_platform_version, - "system_cpu": system_cpu, + "system_os": current_system_os(), + "system_os_version": current_system_os_version(), + "system_cpu": current_system_cpu(), "cpu_percent": psutil.cpu_percent(percpu=False), "cpu_percent_per_cpu": psutil.cpu_percent(percpu=True), "cpu_count": psutil.cpu_count(logical=False), @@ -437,6 +432,14 @@ def renderer_info(): return renderer_data +@server.get('/api//is_available') +def is_engine_available(engine_name): + return {'engine': engine_name, 'available': RenderQueue.is_available_for_job(engine_name), + 'cpu_count': int(psutil.cpu_count(logical=False)), + 'versions': EngineManager.all_versions_for_engine(engine_name), + 'hostname': server.config['HOSTNAME']} + + @server.get('/api/is_engine_available_to_download') def is_engine_available_to_download(): available_result = EngineManager.version_is_available_to_download(request.args.get('engine'), diff --git a/src/api/server_proxy.py b/src/api/server_proxy.py index 19c3cc4..295e0ed 100644 --- a/src/api/server_proxy.py +++ b/src/api/server_proxy.py @@ -116,6 +116,9 @@ class RenderServerProxy: def get_status(self): return self.request_data('status') + def is_engine_available(self, engine_name): + return self.request_data(f'{engine_name}/is_available') + def notify_parent_of_status_change(self, parent_id, subjob): return requests.post(f'http://{self.hostname}:{self.port}/api/job/{parent_id}/notify_parent_of_status_change', json=subjob.json()) diff --git a/src/distributed_job_manager.py b/src/distributed_job_manager.py index 08068f5..7690ed6 100644 --- a/src/distributed_job_manager.py +++ b/src/distributed_job_manager.py @@ -195,6 +195,7 @@ class DistributedJobManager: # Check availability available_servers = cls.find_available_servers(worker.renderer) + logger.debug(f"Splitting into subjobs - Available servers: {available_servers}") subjob_servers = cls.distribute_server_work(worker.start_frame, worker.end_frame, available_servers) local_hostname = socket.gethostname() @@ -323,17 +324,17 @@ class DistributedJobManager: return server_breakdown @staticmethod - def find_available_servers(renderer): + def find_available_servers(engine_name): """ - Scan the Zeroconf network for currently available render servers supporting a specific renderer. + Scan the Zeroconf network for currently available render servers supporting a specific engine. - :param renderer: str, The renderer type to search for + :param engine_name: str, The engine type to search for :return: A list of dictionaries with each dict containing hostname and cpu_count of available servers """ available_servers = [] for hostname in ZeroconfServer.found_clients(): - response = RenderServerProxy(hostname).get_status() - if response and response.get('renderers', {}).get(renderer, {}).get('is_available', False): - available_servers.append({'hostname': hostname, 'cpu_count': int(response['cpu_count'])}) + response = RenderServerProxy(hostname).is_engine_available(engine_name) + if response and response.get('available', False): + available_servers.append(response) return available_servers diff --git a/src/engines/blender/blender_downloader.py b/src/engines/blender/blender_downloader.py index 8045b57..060f59d 100644 --- a/src/engines/blender/blender_downloader.py +++ b/src/engines/blender/blender_downloader.py @@ -1,5 +1,4 @@ import logging -import platform import re import requests @@ -96,8 +95,8 @@ class BlenderDownloader: @classmethod def download_engine(cls, version, download_location, system_os=None, cpu=None, timeout=120): - system_os = system_os or platform.system().lower().replace('darwin', 'macos') - cpu = cpu or platform.machine().lower().replace('amd64', 'x64') + system_os = system_os or current_system_os() + cpu = cpu or current_system_cpu() try: logger.info(f"Requesting download of blender-{version}-{system_os}-{cpu}") diff --git a/src/engines/engine_manager.py b/src/engines/engine_manager.py index c2ebc0e..25b1e17 100644 --- a/src/engines/engine_manager.py +++ b/src/engines/engine_manager.py @@ -1,13 +1,12 @@ import logging import os -import platform import shutil from src.engines.blender.blender_downloader import BlenderDownloader from src.engines.blender.blender_engine import Blender from src.engines.ffmpeg.ffmpeg_downloader import FFMPEGDownloader from src.engines.ffmpeg.ffmpeg_engine import FFMPEG -from src.utilities.misc_helper import system_safe_path +from src.utilities.misc_helper import system_safe_path, current_system_os, current_system_cpu logger = logging.getLogger() @@ -64,8 +63,8 @@ class EngineManager: for eng in cls.supported_engines(): if eng.default_renderer_path(): results.append({'engine': eng.name(), 'version': eng().version(), - 'system_os': cls.system_os(), - 'cpu': cls.system_cpu(), + 'system_os': current_system_os(), + 'cpu': current_system_cpu(), 'path': eng.default_renderer_path(), 'type': 'system'}) return results @@ -76,8 +75,8 @@ class EngineManager: @classmethod def newest_engine_version(cls, engine, system_os=None, cpu=None): - system_os = system_os or cls.system_os() - cpu = cpu or cls.system_cpu() + system_os = system_os or current_system_os() + cpu = cpu or current_system_cpu() try: filtered = [x for x in cls.all_engines() if x['engine'] == engine and x['system_os'] == system_os and x['cpu'] == cpu] @@ -89,21 +88,13 @@ class EngineManager: @classmethod def is_version_downloaded(cls, engine, version, system_os=None, cpu=None): - system_os = system_os or cls.system_os() - cpu = cpu or cls.system_cpu() + system_os = system_os or current_system_os() + cpu = cpu or current_system_cpu() filtered = [x for x in cls.all_engines() if x['engine'] == engine and x['system_os'] == system_os and x['cpu'] == cpu and x['version'] == version] return filtered[0] if filtered else False - @staticmethod - def system_os(): - return platform.system().lower().replace('darwin', 'macos') - - @staticmethod - def system_cpu(): - return platform.machine().lower().replace('amd64', 'x64') - @classmethod def version_is_available_to_download(cls, engine, version, system_os=None, cpu=None): try: @@ -158,10 +149,15 @@ class EngineManager: def update_all_engines(cls): logger.info(f"Checking for updates for render engines...") for engine, engine_downloader in cls.downloader_classes.items(): - latest_version = engine_downloader.find_most_recent_version().get('version') - if latest_version and not cls.is_version_downloaded(engine, latest_version): - logger.info(f"Downloading newest version of {engine} ({latest_version})") - cls.download_engine(engine, latest_version) + logger.debug(f"Checking for updates to {engine}") + latest_version = engine_downloader.find_most_recent_version() + if latest_version: + logger.debug(f"Latest version of {engine} available: {latest_version.get('version')}") + if not cls.is_version_downloaded(engine, latest_version.get('version')): + logger.info(f"Downloading {engine} ({latest_version['version']})") + cls.download_engine(engine=engine, version=latest_version['version']) + else: + logger.warning(f"Unable to get latest version for {engine}") if __name__ == '__main__': @@ -169,3 +165,4 @@ if __name__ == '__main__': # print(EngineManager.newest_engine_version('blender', 'macos', 'arm64')) EngineManager.delete_engine_download('blender', '3.2.1', 'macos', 'a') + diff --git a/src/engines/ffmpeg/ffmpeg_downloader.py b/src/engines/ffmpeg/ffmpeg_downloader.py index dff031a..f539a8e 100644 --- a/src/engines/ffmpeg/ffmpeg_downloader.py +++ b/src/engines/ffmpeg/ffmpeg_downloader.py @@ -23,18 +23,34 @@ class FFMPEGDownloader: windows_download_url = "https://github.com/GyanD/codexffmpeg/releases/download/" windows_api_url = "https://api.github.com/repos/GyanD/codexffmpeg/releases" + # used to cache renderer versions in case they need to be accessed frequently + version_cache = {} + @classmethod - def __get_macos_versions(cls): + def __get_macos_versions(cls, use_cache=True): + + # cache the versions locally + version_cache = cls.version_cache.get('macos') + if version_cache and use_cache: + return version_cache + response = requests.get(cls.macos_url, timeout=5) response.raise_for_status() link_pattern = r'>(.*\.zip)[^\.]' link_matches = re.findall(link_pattern, response.text) - return [link.split('-')[-1].split('.zip')[0] for link in link_matches] + releases = [link.split('-')[-1].split('.zip')[0] for link in link_matches] + cls.version_cache['macos'] = releases + return releases @classmethod - def __get_linux_versions(cls): + def __get_linux_versions(cls, use_cache=True): + + # cache the versions locally + version_cache = cls.version_cache.get('linux') + if version_cache and use_cache: + return version_cache # Link 1 / 2 - Current Version response = requests.get(cls.linux_url, timeout=5) @@ -47,19 +63,29 @@ class FFMPEGDownloader: releases = list(set(re.findall(r'href="ffmpeg-([\w\.]+)-.*">ffmpeg', response.text))) releases.sort(reverse=True) releases.insert(0, current_release) + + # Add to cache + cls.version_cache['linux'] = releases return releases @classmethod - def __get_windows_versions(cls): + def __get_windows_versions(cls, use_cache=True): + + version_cache = cls.version_cache.get('windows') + if version_cache and use_cache: + return version_cache + response = requests.get(cls.windows_api_url, timeout=5) response.raise_for_status() - versions = [] + releases = [] all_git_releases = response.json() for item in all_git_releases: if re.match(r'^[0-9.]+$', item['tag_name']): - versions.append(item['tag_name']) - return versions + releases.append(item['tag_name']) + + cls.version_cache['linux'] = releases + return releases @classmethod def find_most_recent_version(cls, system_os=None, cpu=None, lts_only=False): @@ -75,7 +101,8 @@ class FFMPEGDownloader: def all_versions(cls, system_os=None, cpu=None): system_os = system_os or current_system_os() cpu = cpu or current_system_cpu() - versions_per_os = {'linux': cls.__get_linux_versions, 'macos': cls.__get_macos_versions, 'windows': cls.__get_windows_versions} + versions_per_os = {'linux': cls.__get_linux_versions, 'macos': cls.__get_macos_versions, + 'windows': cls.__get_windows_versions} if not versions_per_os.get(system_os): logger.error(f"Cannot find version list for {system_os}") return @@ -83,9 +110,9 @@ class FFMPEGDownloader: results = [] all_versions = versions_per_os[system_os]() for version in all_versions: - remote_url = cls.__get_remote_url_for_version(version, system_os, cpu) + remote_url = cls.__get_remote_url_for_version(version=version, system_os=system_os, cpu=cpu) results.append({'cpu': cpu, 'file': os.path.basename(remote_url), 'system_os': system_os, 'url': remote_url, - 'version': version}) + 'version': version}) return results @classmethod @@ -102,8 +129,12 @@ class FFMPEGDownloader: if system_os == 'macos': remote_url = os.path.join(cls.macos_url, f"ffmpeg-{version}.zip") elif system_os == 'linux': - release_dir = 'releases' if version == cls.__get_linux_versions()[0] else 'old-releases' - remote_url = os.path.join(cls.linux_url, release_dir, f'ffmpeg-{version}-{cpu}-static.tar.xz') + cpu = cpu.replace('x64', 'amd64') # change cpu to match repo naming convention + latest_release = (version == cls.__get_linux_versions(use_cache=True)[0]) + release_dir = 'releases' if latest_release else 'old-releases' + release_filename = f'ffmpeg-release-{cpu}-static.tar.xz' if latest_release else \ + f'ffmpeg-{version}-{cpu}-static.tar.xz' + remote_url = os.path.join(cls.linux_url, release_dir, release_filename) elif system_os == 'windows': remote_url = f"{cls.windows_download_url.strip('/')}/{version}/ffmpeg-{version}-full_build.zip" else: @@ -113,14 +144,16 @@ class FFMPEGDownloader: @classmethod def download_engine(cls, version, download_location, system_os=None, cpu=None, timeout=120): system_os = system_os or current_system_os() - cpu = cpu or current_system_os() + cpu = cpu or current_system_cpu() # Verify requested version is available - if version not in cls.all_versions(system_os): - logger.error(f"Cannot find FFMPEG version {version} for {system_os}") + found_version = [item for item in cls.all_versions(system_os, cpu) if item['version'] == version] + if not found_version: + logger.error(f"Cannot find FFMPEG version {version} for {system_os} and {cpu}") + return # Platform specific naming cleanup - remote_url = cls.__get_remote_url_for_version(version, system_os, cpu) + remote_url = cls.__get_remote_url_for_version(version=version, system_os=system_os, cpu=cpu) if system_os == 'macos': # override location to match linux download_location = os.path.join(download_location, f'ffmpeg-{version}-{system_os}-{cpu}') @@ -130,18 +163,21 @@ class FFMPEGDownloader: download_and_extract_app(remote_url=remote_url, download_location=download_location, timeout=timeout) # naming cleanup to match existing naming convention + output_path = os.path.join(download_location, f'ffmpeg-{version}-{system_os}-{cpu}') if system_os == 'linux': - os.rename(os.path.join(download_location, f'ffmpeg-{version}-{cpu}-static'), - os.path.join(download_location, f'ffmpeg-{version}-{system_os}-{cpu}')) + initial_cpu = cpu.replace('x64', 'amd64') # change cpu to match repo naming convention + os.rename(os.path.join(download_location, f'ffmpeg-{version}-{initial_cpu}-static'), output_path) elif system_os == 'windows': - os.rename(os.path.join(download_location, f'ffmpeg-{version}-full_build'), - os.path.join(download_location, f'ffmpeg-{version}-{system_os}-{cpu}')) - - except IndexError: - logger.error("Cannot download requested engine") + os.rename(os.path.join(download_location, f'ffmpeg-{version}-full_build'), output_path) + return output_path + except (IndexError, FileNotFoundError) as e: + logger.error(f"Cannot download requested engine: {e}") + except OSError as e: + logger.error(f"OS error while processing engine download: {e}") if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') # print(FFMPEGDownloader.download_engine('6.0', '/Users/brett/zordon-uploads/engines/')) - print(FFMPEGDownloader.download_engine(version='6.0', download_location='/Users/brett/zordon-uploads/engines/')) \ No newline at end of file + # print(FFMPEGDownloader.find_most_recent_version(system_os='linux')) + print(FFMPEGDownloader.download_engine(version='6.0', download_location='/Users/brett/zordon-uploads/engines/', system_os='linux', cpu='x64')) \ No newline at end of file diff --git a/src/utilities/misc_helper.py b/src/utilities/misc_helper.py index edda562..8e42da9 100644 --- a/src/utilities/misc_helper.py +++ b/src/utilities/misc_helper.py @@ -116,5 +116,10 @@ def current_system_os(): return platform.system().lower().replace('darwin', 'macos') +def current_system_os_version(): + return platform.mac_ver()[0] if current_system_os() == 'macos' else platform.release().lower() + + def current_system_cpu(): - return platform.machine().lower().replace('amd64', 'x64') + # convert all x86 64 to "x64" + return platform.machine().lower().replace('amd64', 'x64').replace('x86_64', 'x64')