Split add job helper (#45)

* Fix issue with engine not being available to download

* Add version caches to ffmpeg downloader

* Remove some test parameters

* "releases" should be "release" in linux ffmpeg url

* CPU was incorrectly reported as OS

* Fix naming structure for FFMPEG downloads for linux

* More linux ffmpeg work

* Improved error handling

* WIP

* Consolidate platform reporting to not use platform directly

* Fix missing folder name

* Fix project output naming

* Undo config.yaml commit

* Add is_engine_available API call

* Fix issue where subjobs would not find servers
This commit is contained in:
2023-10-25 02:49:07 -05:00
committed by GitHub
parent 03e7b95e1b
commit f01192909d
9 changed files with 130 additions and 76 deletions

View File

@@ -6,4 +6,5 @@ flask_log_level: error
flask_debug_enable: false
queue_eval_seconds: 1
port_number: 8080
enable_split_jobs: true
enable_split_jobs: true
download_timeout_seconds: 120

View File

@@ -44,22 +44,22 @@ def handle_uploaded_project_files(request, jobs_list, upload_directory):
job_dir = os.path.join(upload_directory, '-'.join(
[datetime.now().strftime("%Y.%m.%d_%H.%M.%S"), renderer, cleaned_path_name]))
os.makedirs(job_dir, exist_ok=True)
upload_dir = os.path.join(job_dir, 'source')
os.makedirs(upload_dir, exist_ok=True)
project_source_dir = os.path.join(job_dir, 'source')
os.makedirs(project_source_dir, exist_ok=True)
# Move projects to their work directories
if uploaded_project and uploaded_project.filename:
loaded_project_local_path = os.path.join(upload_dir, secure_filename(uploaded_project.filename))
loaded_project_local_path = os.path.join(project_source_dir, secure_filename(uploaded_project.filename))
uploaded_project.save(loaded_project_local_path)
logger.info(f"Transfer complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}")
logger.info(f"Transfer complete for {loaded_project_local_path.split(upload_directory)[-1]}")
elif project_url:
loaded_project_local_path = os.path.join(upload_dir, referred_name)
loaded_project_local_path = os.path.join(project_source_dir, referred_name)
shutil.move(downloaded_file_url, loaded_project_local_path)
logger.info(f"Download complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}")
logger.info(f"Download complete for {loaded_project_local_path.split(upload_directory)[-1]}")
elif local_path:
loaded_project_local_path = os.path.join(upload_dir, referred_name)
loaded_project_local_path = os.path.join(project_source_dir, referred_name)
shutil.copy(local_path, loaded_project_local_path)
logger.info(f"Import complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}")
logger.info(f"Import complete for {loaded_project_local_path.split(upload_directory)[-1]}")
return loaded_project_local_path, referred_name
@@ -137,14 +137,21 @@ def create_render_jobs(jobs_list, loaded_project_local_path, job_dir, enable_spl
os.makedirs(output_dir, exist_ok=True)
# get new output path in output_dir
job_data['output_path'] = os.path.join(output_dir, os.path.basename(
job_data.get('output_path', None) or loaded_project_local_path
))
output_path = job_data.get('output_path')
if not output_path:
loaded_project_filename = os.path.basename(loaded_project_local_path)
output_filename = os.path.splitext(loaded_project_filename)[0]
else:
output_filename = os.path.basename(output_path)
output_path = os.path.join(os.path.dirname(os.path.dirname(loaded_project_local_path)), 'output',
output_filename)
logger.debug(f"New job output path: {output_path}")
# create & configure jobs
worker = RenderWorkerFactory.create_worker(renderer=job_data['renderer'],
input_path=loaded_project_local_path,
output_path=job_data["output_path"],
output_path=output_path,
engine_version=job_data.get('engine_version'),
args=job_data.get('args', {}))
worker.status = job_data.get("initial_status", worker.status)
@@ -157,6 +164,8 @@ def create_render_jobs(jobs_list, loaded_project_local_path, job_dir, enable_spl
# determine if we can / should split the job
if enable_split_jobs and (worker.total_frames > 1) and not worker.parent:
DistributedJobManager.split_into_subjobs(worker, job_data, loaded_project_local_path)
else:
logger.debug("Not splitting into subjobs")
RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False))
if not worker.parent:

View File

@@ -3,7 +3,6 @@ import json
import logging
import os
import pathlib
import platform
import shutil
import socket
import ssl
@@ -25,7 +24,7 @@ from src.engines.core.base_worker import string_to_status, RenderStatus
from src.engines.core.worker_factory import RenderWorkerFactory
from src.engines.engine_manager import EngineManager
from src.render_queue import RenderQueue, JobNotFoundError
from src.utilities.misc_helper import system_safe_path
from src.utilities.misc_helper import system_safe_path, current_system_os, current_system_cpu, current_system_os_version
from src.utilities.server_helper import generate_thumbnail_for_job
from src.utilities.zeroconf_server import ZeroconfServer
@@ -399,14 +398,10 @@ def status():
}
# Get system info
system_platform = platform.system().lower().replace('darwin', 'macos')
system_platform_version = platform.mac_ver()[0] if system_platform == 'macos' else platform.release().lower()
system_cpu = platform.machine().lower().replace('amd64', 'x64')
return {"timestamp": datetime.now().isoformat(),
"system_platform": system_platform,
"system_platform_version": system_platform_version,
"system_cpu": system_cpu,
"system_os": current_system_os(),
"system_os_version": current_system_os_version(),
"system_cpu": current_system_cpu(),
"cpu_percent": psutil.cpu_percent(percpu=False),
"cpu_percent_per_cpu": psutil.cpu_percent(percpu=True),
"cpu_count": psutil.cpu_count(logical=False),
@@ -437,6 +432,14 @@ def renderer_info():
return renderer_data
@server.get('/api/<engine_name>/is_available')
def is_engine_available(engine_name):
return {'engine': engine_name, 'available': RenderQueue.is_available_for_job(engine_name),
'cpu_count': int(psutil.cpu_count(logical=False)),
'versions': EngineManager.all_versions_for_engine(engine_name),
'hostname': server.config['HOSTNAME']}
@server.get('/api/is_engine_available_to_download')
def is_engine_available_to_download():
available_result = EngineManager.version_is_available_to_download(request.args.get('engine'),

View File

@@ -116,6 +116,9 @@ class RenderServerProxy:
def get_status(self):
return self.request_data('status')
def is_engine_available(self, engine_name):
return self.request_data(f'{engine_name}/is_available')
def notify_parent_of_status_change(self, parent_id, subjob):
return requests.post(f'http://{self.hostname}:{self.port}/api/job/{parent_id}/notify_parent_of_status_change',
json=subjob.json())

View File

@@ -195,6 +195,7 @@ class DistributedJobManager:
# Check availability
available_servers = cls.find_available_servers(worker.renderer)
logger.debug(f"Splitting into subjobs - Available servers: {available_servers}")
subjob_servers = cls.distribute_server_work(worker.start_frame, worker.end_frame, available_servers)
local_hostname = socket.gethostname()
@@ -323,17 +324,17 @@ class DistributedJobManager:
return server_breakdown
@staticmethod
def find_available_servers(renderer):
def find_available_servers(engine_name):
"""
Scan the Zeroconf network for currently available render servers supporting a specific renderer.
Scan the Zeroconf network for currently available render servers supporting a specific engine.
:param renderer: str, The renderer type to search for
:param engine_name: str, The engine type to search for
:return: A list of dictionaries with each dict containing hostname and cpu_count of available servers
"""
available_servers = []
for hostname in ZeroconfServer.found_clients():
response = RenderServerProxy(hostname).get_status()
if response and response.get('renderers', {}).get(renderer, {}).get('is_available', False):
available_servers.append({'hostname': hostname, 'cpu_count': int(response['cpu_count'])})
response = RenderServerProxy(hostname).is_engine_available(engine_name)
if response and response.get('available', False):
available_servers.append(response)
return available_servers

View File

@@ -1,5 +1,4 @@
import logging
import platform
import re
import requests
@@ -96,8 +95,8 @@ class BlenderDownloader:
@classmethod
def download_engine(cls, version, download_location, system_os=None, cpu=None, timeout=120):
system_os = system_os or platform.system().lower().replace('darwin', 'macos')
cpu = cpu or platform.machine().lower().replace('amd64', 'x64')
system_os = system_os or current_system_os()
cpu = cpu or current_system_cpu()
try:
logger.info(f"Requesting download of blender-{version}-{system_os}-{cpu}")

View File

@@ -1,13 +1,12 @@
import logging
import os
import platform
import shutil
from src.engines.blender.blender_downloader import BlenderDownloader
from src.engines.blender.blender_engine import Blender
from src.engines.ffmpeg.ffmpeg_downloader import FFMPEGDownloader
from src.engines.ffmpeg.ffmpeg_engine import FFMPEG
from src.utilities.misc_helper import system_safe_path
from src.utilities.misc_helper import system_safe_path, current_system_os, current_system_cpu
logger = logging.getLogger()
@@ -64,8 +63,8 @@ class EngineManager:
for eng in cls.supported_engines():
if eng.default_renderer_path():
results.append({'engine': eng.name(), 'version': eng().version(),
'system_os': cls.system_os(),
'cpu': cls.system_cpu(),
'system_os': current_system_os(),
'cpu': current_system_cpu(),
'path': eng.default_renderer_path(), 'type': 'system'})
return results
@@ -76,8 +75,8 @@ class EngineManager:
@classmethod
def newest_engine_version(cls, engine, system_os=None, cpu=None):
system_os = system_os or cls.system_os()
cpu = cpu or cls.system_cpu()
system_os = system_os or current_system_os()
cpu = cpu or current_system_cpu()
try:
filtered = [x for x in cls.all_engines() if x['engine'] == engine and x['system_os'] == system_os and x['cpu'] == cpu]
@@ -89,21 +88,13 @@ class EngineManager:
@classmethod
def is_version_downloaded(cls, engine, version, system_os=None, cpu=None):
system_os = system_os or cls.system_os()
cpu = cpu or cls.system_cpu()
system_os = system_os or current_system_os()
cpu = cpu or current_system_cpu()
filtered = [x for x in cls.all_engines() if
x['engine'] == engine and x['system_os'] == system_os and x['cpu'] == cpu and x['version'] == version]
return filtered[0] if filtered else False
@staticmethod
def system_os():
return platform.system().lower().replace('darwin', 'macos')
@staticmethod
def system_cpu():
return platform.machine().lower().replace('amd64', 'x64')
@classmethod
def version_is_available_to_download(cls, engine, version, system_os=None, cpu=None):
try:
@@ -158,10 +149,15 @@ class EngineManager:
def update_all_engines(cls):
logger.info(f"Checking for updates for render engines...")
for engine, engine_downloader in cls.downloader_classes.items():
latest_version = engine_downloader.find_most_recent_version().get('version')
if latest_version and not cls.is_version_downloaded(engine, latest_version):
logger.info(f"Downloading newest version of {engine} ({latest_version})")
cls.download_engine(engine, latest_version)
logger.debug(f"Checking for updates to {engine}")
latest_version = engine_downloader.find_most_recent_version()
if latest_version:
logger.debug(f"Latest version of {engine} available: {latest_version.get('version')}")
if not cls.is_version_downloaded(engine, latest_version.get('version')):
logger.info(f"Downloading {engine} ({latest_version['version']})")
cls.download_engine(engine=engine, version=latest_version['version'])
else:
logger.warning(f"Unable to get latest version for {engine}")
if __name__ == '__main__':
@@ -169,3 +165,4 @@ if __name__ == '__main__':
# print(EngineManager.newest_engine_version('blender', 'macos', 'arm64'))
EngineManager.delete_engine_download('blender', '3.2.1', 'macos', 'a')

View File

@@ -23,18 +23,34 @@ class FFMPEGDownloader:
windows_download_url = "https://github.com/GyanD/codexffmpeg/releases/download/"
windows_api_url = "https://api.github.com/repos/GyanD/codexffmpeg/releases"
# used to cache renderer versions in case they need to be accessed frequently
version_cache = {}
@classmethod
def __get_macos_versions(cls):
def __get_macos_versions(cls, use_cache=True):
# cache the versions locally
version_cache = cls.version_cache.get('macos')
if version_cache and use_cache:
return version_cache
response = requests.get(cls.macos_url, timeout=5)
response.raise_for_status()
link_pattern = r'>(.*\.zip)[^\.]'
link_matches = re.findall(link_pattern, response.text)
return [link.split('-')[-1].split('.zip')[0] for link in link_matches]
releases = [link.split('-')[-1].split('.zip')[0] for link in link_matches]
cls.version_cache['macos'] = releases
return releases
@classmethod
def __get_linux_versions(cls):
def __get_linux_versions(cls, use_cache=True):
# cache the versions locally
version_cache = cls.version_cache.get('linux')
if version_cache and use_cache:
return version_cache
# Link 1 / 2 - Current Version
response = requests.get(cls.linux_url, timeout=5)
@@ -47,19 +63,29 @@ class FFMPEGDownloader:
releases = list(set(re.findall(r'href="ffmpeg-([\w\.]+)-.*">ffmpeg', response.text)))
releases.sort(reverse=True)
releases.insert(0, current_release)
# Add to cache
cls.version_cache['linux'] = releases
return releases
@classmethod
def __get_windows_versions(cls):
def __get_windows_versions(cls, use_cache=True):
version_cache = cls.version_cache.get('windows')
if version_cache and use_cache:
return version_cache
response = requests.get(cls.windows_api_url, timeout=5)
response.raise_for_status()
versions = []
releases = []
all_git_releases = response.json()
for item in all_git_releases:
if re.match(r'^[0-9.]+$', item['tag_name']):
versions.append(item['tag_name'])
return versions
releases.append(item['tag_name'])
cls.version_cache['linux'] = releases
return releases
@classmethod
def find_most_recent_version(cls, system_os=None, cpu=None, lts_only=False):
@@ -75,7 +101,8 @@ class FFMPEGDownloader:
def all_versions(cls, system_os=None, cpu=None):
system_os = system_os or current_system_os()
cpu = cpu or current_system_cpu()
versions_per_os = {'linux': cls.__get_linux_versions, 'macos': cls.__get_macos_versions, 'windows': cls.__get_windows_versions}
versions_per_os = {'linux': cls.__get_linux_versions, 'macos': cls.__get_macos_versions,
'windows': cls.__get_windows_versions}
if not versions_per_os.get(system_os):
logger.error(f"Cannot find version list for {system_os}")
return
@@ -83,9 +110,9 @@ class FFMPEGDownloader:
results = []
all_versions = versions_per_os[system_os]()
for version in all_versions:
remote_url = cls.__get_remote_url_for_version(version, system_os, cpu)
remote_url = cls.__get_remote_url_for_version(version=version, system_os=system_os, cpu=cpu)
results.append({'cpu': cpu, 'file': os.path.basename(remote_url), 'system_os': system_os, 'url': remote_url,
'version': version})
'version': version})
return results
@classmethod
@@ -102,8 +129,12 @@ class FFMPEGDownloader:
if system_os == 'macos':
remote_url = os.path.join(cls.macos_url, f"ffmpeg-{version}.zip")
elif system_os == 'linux':
release_dir = 'releases' if version == cls.__get_linux_versions()[0] else 'old-releases'
remote_url = os.path.join(cls.linux_url, release_dir, f'ffmpeg-{version}-{cpu}-static.tar.xz')
cpu = cpu.replace('x64', 'amd64') # change cpu to match repo naming convention
latest_release = (version == cls.__get_linux_versions(use_cache=True)[0])
release_dir = 'releases' if latest_release else 'old-releases'
release_filename = f'ffmpeg-release-{cpu}-static.tar.xz' if latest_release else \
f'ffmpeg-{version}-{cpu}-static.tar.xz'
remote_url = os.path.join(cls.linux_url, release_dir, release_filename)
elif system_os == 'windows':
remote_url = f"{cls.windows_download_url.strip('/')}/{version}/ffmpeg-{version}-full_build.zip"
else:
@@ -113,14 +144,16 @@ class FFMPEGDownloader:
@classmethod
def download_engine(cls, version, download_location, system_os=None, cpu=None, timeout=120):
system_os = system_os or current_system_os()
cpu = cpu or current_system_os()
cpu = cpu or current_system_cpu()
# Verify requested version is available
if version not in cls.all_versions(system_os):
logger.error(f"Cannot find FFMPEG version {version} for {system_os}")
found_version = [item for item in cls.all_versions(system_os, cpu) if item['version'] == version]
if not found_version:
logger.error(f"Cannot find FFMPEG version {version} for {system_os} and {cpu}")
return
# Platform specific naming cleanup
remote_url = cls.__get_remote_url_for_version(version, system_os, cpu)
remote_url = cls.__get_remote_url_for_version(version=version, system_os=system_os, cpu=cpu)
if system_os == 'macos': # override location to match linux
download_location = os.path.join(download_location, f'ffmpeg-{version}-{system_os}-{cpu}')
@@ -130,18 +163,21 @@ class FFMPEGDownloader:
download_and_extract_app(remote_url=remote_url, download_location=download_location, timeout=timeout)
# naming cleanup to match existing naming convention
output_path = os.path.join(download_location, f'ffmpeg-{version}-{system_os}-{cpu}')
if system_os == 'linux':
os.rename(os.path.join(download_location, f'ffmpeg-{version}-{cpu}-static'),
os.path.join(download_location, f'ffmpeg-{version}-{system_os}-{cpu}'))
initial_cpu = cpu.replace('x64', 'amd64') # change cpu to match repo naming convention
os.rename(os.path.join(download_location, f'ffmpeg-{version}-{initial_cpu}-static'), output_path)
elif system_os == 'windows':
os.rename(os.path.join(download_location, f'ffmpeg-{version}-full_build'),
os.path.join(download_location, f'ffmpeg-{version}-{system_os}-{cpu}'))
except IndexError:
logger.error("Cannot download requested engine")
os.rename(os.path.join(download_location, f'ffmpeg-{version}-full_build'), output_path)
return output_path
except (IndexError, FileNotFoundError) as e:
logger.error(f"Cannot download requested engine: {e}")
except OSError as e:
logger.error(f"OS error while processing engine download: {e}")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# print(FFMPEGDownloader.download_engine('6.0', '/Users/brett/zordon-uploads/engines/'))
print(FFMPEGDownloader.download_engine(version='6.0', download_location='/Users/brett/zordon-uploads/engines/'))
# print(FFMPEGDownloader.find_most_recent_version(system_os='linux'))
print(FFMPEGDownloader.download_engine(version='6.0', download_location='/Users/brett/zordon-uploads/engines/', system_os='linux', cpu='x64'))

View File

@@ -116,5 +116,10 @@ def current_system_os():
return platform.system().lower().replace('darwin', 'macos')
def current_system_os_version():
return platform.mac_ver()[0] if current_system_os() == 'macos' else platform.release().lower()
def current_system_cpu():
return platform.machine().lower().replace('amd64', 'x64')
# convert all x86 64 to "x64"
return platform.machine().lower().replace('amd64', 'x64').replace('x86_64', 'x64')