33 Commits

Author SHA1 Message Date
Brett Williams
298a2ccc64 Merge remote-tracking branch 'origin/#24_generate_new_subjobs_on_error' into #24_generate_new_subjobs_on_error 2023-10-26 04:22:09 -05:00
Brett Williams
496e5f78a5 Split_into_subjobs WIP 2023-10-26 04:21:52 -05:00
Brett Williams
c1e5fd1129 Updated wait_for_subjobs 2023-10-26 04:21:52 -05:00
Brett Williams
f6073b2954 Add last connected to server_proxy.py 2023-10-26 04:21:52 -05:00
Brett Williams
cc1d6ba452 Misc cleanup 2023-10-26 04:21:52 -05:00
Brett Williams
a5e9ac0014 wait_for_subjobs rewrite 2023-10-26 04:21:52 -05:00
Brett Williams
1d44716a1f Added new_create_subjob method 2023-10-26 04:21:52 -05:00
Brett Williams
ba81be7088 Move the current_frame attribute to base_worker.py 2023-10-26 04:21:52 -05:00
Brett Williams
8574486443 Missed a line 2023-10-26 04:21:52 -05:00
Brett Williams
fca2a9f441 Fix issue where subjobs were not updating parent job json 2023-10-26 04:21:52 -05:00
Brett Williams
0730b20c52 Added two stubs for methods needed for dynamic subjob generation 2023-10-26 04:21:51 -05:00
Brett Williams
deac943e4c Add UI notifications to a try block 2023-10-25 21:52:08 -05:00
Brett Williams
80ffda8447 Split_into_subjobs WIP 2023-10-25 19:00:41 -05:00
Brett Williams
3b975418de Updated wait_for_subjobs 2023-10-25 18:59:01 -05:00
Brett Williams
b646c1f848 Add last connected to server_proxy.py 2023-10-25 18:54:36 -05:00
Brett Williams
0fe50bc175 Misc cleanup 2023-10-25 18:53:41 -05:00
Brett Williams
fa0bdf807f wait_for_subjobs rewrite 2023-10-25 15:54:14 -05:00
Brett Williams
5b102a5ea4 Added new_create_subjob method 2023-10-25 15:07:17 -05:00
Brett Williams
006a97a17a Move the current_frame attribute to base_worker.py 2023-10-25 13:41:14 -05:00
Brett Williams
3e567060f8 Missed a line 2023-10-25 11:47:12 -05:00
Brett Williams
7dff2e3393 Fix issue where subjobs were not updating parent job json 2023-10-25 11:46:09 -05:00
Brett Williams
0f4a9b5ddd Added two stubs for methods needed for dynamic subjob generation 2023-10-25 07:28:49 -05:00
Brett Williams
32d863f624 Cancel all children if parent job is cancelled 2023-10-25 06:37:46 -05:00
Brett Williams
760d239d0c Minor fixes 2023-10-25 06:29:30 -05:00
cc1cf92118 Add simple notifications on job errors, completion and start. (#46) 2023-10-25 06:15:52 -05:00
Brett Williams
917a15c60c Make engine updating multithreaded 2023-10-25 02:57:25 -05:00
f01192909d Split add job helper (#45)
* Fix issue with engine not being available to download

* Add version caches to ffmpeg downloader

* Remove some test parameters

* "releases" should be "release" in linux ffmpeg url

* CPU was incorrectly reported as OS

* Fix naming structure for FFMPEG downloads for linux

* More linux ffmpeg work

* Improved error handling

* WIP

* Consolidate platform reporting to not use platform directly

* Fix missing folder name

* Fix project output naming

* Undo config.yaml commit

* Add is_engine_available API call

* Fix issue where subjobs would not find servers
2023-10-25 02:49:07 -05:00
Brett Williams
03e7b95e1b Split add_job_handler out to all_job_helpers.py 2023-10-24 20:53:06 -05:00
782a1a4699 Added engine update check on launch (#43) 2023-10-23 08:26:11 -05:00
e52682c8b9 Engine downloader API for #31 (#42)
* Add is_engine_available_to_download API call

* Fix issue with worker never throwing error if engine is not found

* Add API call to get most recent engine version

* Fix some minor import issues

* Fix web urls

* Fix default server log level

* Add progress bar for project download worker_factory downloads missing engine versions

* Better error handling when invalid version is given

* Add timeouts to engine downloaders
2023-10-22 17:02:30 -05:00
Brett Williams
9603046432 Fix path to blender pack_project.py 2023-10-21 22:48:15 -05:00
Brett Williams
9027cd7202 More code re-organizing 2023-10-21 22:45:30 -05:00
7a52cce40a Windows File Path Fixes (#39)
* Added system_safe_path to convert paths to Windows

* Fix issue where engines would not be reported unless a system engine was installed

* Platform independent searching for binaries in engine directory

* Add missing package to requirements.txt

* Better error handling for ffmpeg.get_all_formats()

* Add system_safe_path to more locations in api_server.py

* Fix naming issue with Blender on macos

* Fix path lookups and add engine_path to workers

* Report installed renderers in status

* Remove files included by accident
2023-10-21 22:12:09 -05:00
35 changed files with 947 additions and 549 deletions

View File

@@ -1,8 +1,10 @@
upload_folder: "~/zordon-uploads/"
update_engines_on_launch: true
max_content_path: 100000000
server_log_level: info
flask_log_level: error
flask_debug_enable: false
queue_eval_seconds: 1
port_number: 8080
enable_split_jobs: true
enable_split_jobs: true
download_timeout_seconds: 120

View File

@@ -1,4 +0,0 @@
if __name__ == "__main__":
print(FFMPEG.get_frame_count('/Users/brett/Desktop/Big_Fire_02.mov'))

View File

@@ -1,4 +1,5 @@
requests==2.31.0
requests_toolbelt==1.0.0
psutil==5.9.6
PyYAML==6.0.1
Flask==3.0.0
@@ -11,4 +12,6 @@ Pillow==10.1.0
zeroconf==0.119.0
Pypubsub~=4.0.3
tqdm==4.66.1
dmglib==0.9.4
dmglib==0.9.4
plyer==2.1.0
pyobjus==1.2.3

184
src/api/add_job_helpers.py Normal file
View File

@@ -0,0 +1,184 @@
#!/usr/bin/env python3
import logging
import os
import shutil
import tempfile
import zipfile
from datetime import datetime
import requests
from tqdm import tqdm
from werkzeug.utils import secure_filename
from src.distributed_job_manager import DistributedJobManager
from src.engines.core.worker_factory import RenderWorkerFactory
from src.render_queue import RenderQueue
logger = logging.getLogger()
def handle_uploaded_project_files(request, jobs_list, upload_directory):
# Initialize default values
loaded_project_local_path = None
uploaded_project = request.files.get('file', None)
project_url = jobs_list[0].get('url', None)
local_path = jobs_list[0].get('local_path', None)
renderer = jobs_list[0].get('renderer')
downloaded_file_url = None
if uploaded_project and uploaded_project.filename:
referred_name = os.path.basename(uploaded_project.filename)
elif project_url:
referred_name, downloaded_file_url = download_project_from_url(project_url)
if not referred_name:
raise ValueError(f"Error downloading file from URL: {project_url}")
elif local_path and os.path.exists(local_path):
referred_name = os.path.basename(local_path)
else:
raise ValueError("Cannot find any valid project paths")
# Prepare the local filepath
cleaned_path_name = os.path.splitext(referred_name)[0].replace(' ', '_')
job_dir = os.path.join(upload_directory, '-'.join(
[datetime.now().strftime("%Y.%m.%d_%H.%M.%S"), renderer, cleaned_path_name]))
os.makedirs(job_dir, exist_ok=True)
project_source_dir = os.path.join(job_dir, 'source')
os.makedirs(project_source_dir, exist_ok=True)
# Move projects to their work directories
if uploaded_project and uploaded_project.filename:
loaded_project_local_path = os.path.join(project_source_dir, secure_filename(uploaded_project.filename))
uploaded_project.save(loaded_project_local_path)
logger.info(f"Transfer complete for {loaded_project_local_path.split(upload_directory)[-1]}")
elif project_url:
loaded_project_local_path = os.path.join(project_source_dir, referred_name)
shutil.move(downloaded_file_url, loaded_project_local_path)
logger.info(f"Download complete for {loaded_project_local_path.split(upload_directory)[-1]}")
elif local_path:
loaded_project_local_path = os.path.join(project_source_dir, referred_name)
shutil.copy(local_path, loaded_project_local_path)
logger.info(f"Import complete for {loaded_project_local_path.split(upload_directory)[-1]}")
return loaded_project_local_path, referred_name
def download_project_from_url(project_url):
# This nested function is to handle downloading from a URL
logger.info(f"Downloading project from url: {project_url}")
referred_name = os.path.basename(project_url)
downloaded_file_url = None
try:
response = requests.get(project_url, stream=True)
if response.status_code == 200:
# Get the total file size from the "Content-Length" header
file_size = int(response.headers.get("Content-Length", 0))
# Create a progress bar using tqdm
progress_bar = tqdm(total=file_size, unit="B", unit_scale=True)
# Open a file for writing in binary mode
downloaded_file_url = os.path.join(tempfile.gettempdir(), referred_name)
with open(downloaded_file_url, "wb") as file:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
# Write the chunk to the file
file.write(chunk)
# Update the progress bar
progress_bar.update(len(chunk))
# Close the progress bar
progress_bar.close()
else:
return None, None
except Exception as e:
logger.error(f"Error downloading file: {e}")
return None, None
return referred_name, downloaded_file_url
def process_zipped_project(zip_path):
# Given a zip path, extract its content, and return the main project file path
work_path = os.path.dirname(zip_path)
try:
with zipfile.ZipFile(zip_path, 'r') as myzip:
myzip.extractall(work_path)
project_files = [x for x in os.listdir(work_path) if os.path.isfile(os.path.join(work_path, x))]
project_files = [x for x in project_files if '.zip' not in x]
logger.debug(f"Zip files: {project_files}")
# supported_exts = RenderWorkerFactory.class_for_name(renderer).engine.supported_extensions
# if supported_exts:
# project_files = [file for file in project_files if any(file.endswith(ext) for ext in supported_exts)]
# If there's more than 1 project file or none, raise an error
if len(project_files) != 1:
raise ValueError(f'Cannot find a valid project file in {os.path.basename(zip_path)}')
extracted_project_path = os.path.join(work_path, project_files[0])
logger.info(f"Extracted zip file to {extracted_project_path}")
except (zipfile.BadZipFile, zipfile.LargeZipFile) as e:
logger.error(f"Error processing zip file: {e}")
raise ValueError(f"Error processing zip file: {e}")
return extracted_project_path
def create_render_jobs(jobs_list, loaded_project_local_path, job_dir, enable_split_jobs=False):
results = []
for job_data in jobs_list:
try:
# prepare output paths
output_dir = os.path.join(job_dir, job_data.get('name') if len(jobs_list) > 1 else 'output')
os.makedirs(output_dir, exist_ok=True)
# get new output path in output_dir
output_path = job_data.get('output_path')
if not output_path:
loaded_project_filename = os.path.basename(loaded_project_local_path)
output_filename = os.path.splitext(loaded_project_filename)[0]
else:
output_filename = os.path.basename(output_path)
output_path = os.path.join(os.path.dirname(os.path.dirname(loaded_project_local_path)), 'output',
output_filename)
logger.debug(f"New job output path: {output_path}")
# create & configure jobs
worker = RenderWorkerFactory.create_worker(renderer=job_data['renderer'],
input_path=loaded_project_local_path,
output_path=output_path,
engine_version=job_data.get('engine_version'),
args=job_data.get('args', {}))
worker.status = job_data.get("initial_status", worker.status)
worker.parent = job_data.get("parent", worker.parent)
worker.name = job_data.get("name", worker.name)
worker.priority = int(job_data.get('priority', worker.priority))
worker.start_frame = int(job_data.get("start_frame", worker.start_frame))
worker.end_frame = int(job_data.get("end_frame", worker.end_frame))
# determine if we can / should split the job
if enable_split_jobs and (worker.total_frames > 1) and not worker.parent:
DistributedJobManager.split_into_subjobs(worker, job_data, loaded_project_local_path)
else:
logger.debug("Not splitting into subjobs")
RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False))
if not worker.parent:
from src.api.api_server import make_job_ready
make_job_ready(worker.id)
results.append(worker.json())
except FileNotFoundError as e:
err_msg = f"Cannot create job: {e}"
logger.error(err_msg)
results.append({'error': err_msg})
except Exception as e:
err_msg = f"Exception creating render job: {e}"
logger.exception(err_msg)
results.append({'error': err_msg})
return results

View File

@@ -3,31 +3,30 @@ import json
import logging
import os
import pathlib
import platform
import shutil
import socket
import ssl
import tempfile
import threading
import time
import zipfile
from datetime import datetime
from urllib.request import urlretrieve
from zipfile import ZipFile
import json2html
import psutil
import yaml
from flask import Flask, request, render_template, send_file, after_this_request, Response, redirect, url_for, abort
from werkzeug.utils import secure_filename
from src.api.server_proxy import RenderServerProxy
from src.api.add_job_helpers import handle_uploaded_project_files, process_zipped_project, create_render_jobs
from src.distributed_job_manager import DistributedJobManager
from src.engines.core.base_worker import string_to_status, RenderStatus
from src.engines.core.worker_factory import RenderWorkerFactory
from src.engines.engine_manager import EngineManager
from src.render_queue import RenderQueue, JobNotFoundError
from src.server_proxy import RenderServerProxy
from src.utilities.misc_helper import system_safe_path, current_system_os, current_system_cpu, current_system_os_version
from src.utilities.server_helper import generate_thumbnail_for_job
from src.utilities.zeroconf_server import ZeroconfServer
from src.workers.worker_factory import RenderWorkerFactory
from src.workers.base_worker import string_to_status, RenderStatus
logger = logging.getLogger()
server = Flask(__name__, template_folder='web/templates', static_folder='web/static')
@@ -54,7 +53,7 @@ def sorted_jobs(all_jobs, sort_by_date=True):
@server.route('/')
@server.route('/index')
def index():
with open('config/presets.yaml') as f:
with open(system_safe_path('config/presets.yaml')) as f:
render_presets = yaml.load(f, Loader=yaml.FullLoader)
return render_template('index.html', all_jobs=sorted_jobs(RenderQueue.all_jobs()),
@@ -131,15 +130,15 @@ def job_thumbnail(job_id):
# Misc status icons
if found_job.status == RenderStatus.RUNNING:
return send_file('web/static/images/gears.png', mimetype="image/png")
return send_file('../web/static/images/gears.png', mimetype="image/png")
elif found_job.status == RenderStatus.CANCELLED:
return send_file('web/static/images/cancelled.png', mimetype="image/png")
return send_file('../web/static/images/cancelled.png', mimetype="image/png")
elif found_job.status == RenderStatus.SCHEDULED:
return send_file('web/static/images/scheduled.png', mimetype="image/png")
return send_file('../web/static/images/scheduled.png', mimetype="image/png")
elif found_job.status == RenderStatus.NOT_STARTED:
return send_file('web/static/images/not_started.png', mimetype="image/png")
return send_file('../web/static/images/not_started.png', mimetype="image/png")
# errors
return send_file('web/static/images/error.png', mimetype="image/png")
return send_file('../web/static/images/error.png', mimetype="image/png")
# Get job file routing
@@ -169,7 +168,7 @@ def subjob_status_change(job_id):
try:
subjob_details = request.json
logger.info(f"Subjob to job id: {job_id} is now {subjob_details['status']}")
DistributedJobManager.handle_subjob_status_change(RenderQueue.job_with_id(job_id), subjob_data=subjob_details)
DistributedJobManager.handle_subjob_status_change(job_id, subjob_data=subjob_details)
return Response(status=200)
except JobNotFoundError:
return "Job not found", 404
@@ -188,7 +187,7 @@ def get_job_status(job_id):
@server.get('/api/job/<job_id>/logs')
def get_job_logs(job_id):
found_job = RenderQueue.job_with_id(job_id)
log_path = found_job.log_path()
log_path = system_safe_path(found_job.log_path())
log_data = None
if log_path and os.path.exists(log_path):
with open(log_path) as file:
@@ -232,10 +231,11 @@ def download_all(job_id):
found_job = RenderQueue.job_with_id(job_id)
output_dir = os.path.dirname(found_job.output_path)
if os.path.exists(output_dir):
zip_filename = os.path.join('/tmp', pathlib.Path(found_job.input_path).stem + '.zip')
zip_filename = system_safe_path(os.path.join(tempfile.gettempdir(),
pathlib.Path(found_job.input_path).stem + '.zip'))
with ZipFile(zip_filename, 'w') as zipObj:
for f in os.listdir(output_dir):
zipObj.write(filename=os.path.join(output_dir, f),
zipObj.write(filename=system_safe_path(os.path.join(output_dir, f)),
arcname=os.path.basename(f))
return send_file(zip_filename, mimetype="zip", as_attachment=True, )
else:
@@ -244,7 +244,8 @@ def download_all(job_id):
@server.get('/api/presets')
def presets():
with open('config/presets.yaml') as f:
presets_path = system_safe_path('config/presets.yaml')
with open(presets_path) as f:
presets = yaml.load(f, Loader=yaml.FullLoader)
return presets
@@ -278,14 +279,17 @@ def detected_clients():
return ZeroconfServer.found_clients()
# New version
@server.post('/api/add_job')
def add_job_handler():
# initial handling of raw data
# Process request data
try:
if request.is_json:
jobs_list = [request.json] if not isinstance(request.json, list) else request.json
logger.debug(f"Received add_job JSON: {jobs_list}")
elif request.form.get('json', None):
jobs_list = json.loads(request.form['json'])
logger.debug(f"Received add_job form: {jobs_list}")
else:
# Cleanup flat form data into nested structure
form_dict = {k: v for k, v in dict(request.form).items() if v}
@@ -299,137 +303,27 @@ def add_job_handler():
args['raw'] = form_dict.get('raw_args', None)
form_dict['args'] = args
jobs_list = [form_dict]
logger.debug(f"Received add_job data: {jobs_list}")
except Exception as e:
err_msg = f"Error processing job data: {e}"
logger.error(err_msg)
return err_msg, 500
# start handling project files
try:
# handle uploaded files
logger.debug(f"Incoming new job request: {jobs_list}")
uploaded_project = request.files.get('file', None)
project_url = jobs_list[0].get('url', None)
local_path = jobs_list[0].get('local_path', None)
renderer = jobs_list[0].get('renderer')
loaded_project_local_path, referred_name = handle_uploaded_project_files(request, jobs_list,
server.config['UPLOAD_FOLDER'])
if loaded_project_local_path.lower().endswith('.zip'):
loaded_project_local_path = process_zipped_project(loaded_project_local_path)
downloaded_file_url = None
if uploaded_project and uploaded_project.filename:
referred_name = os.path.basename(uploaded_project.filename)
elif project_url:
# download and save url - have to download first to know filename due to redirects
logger.info(f"Attempting to download URL: {project_url}")
try:
downloaded_file_url, info = urlretrieve(project_url)
referred_name = info.get_filename() or os.path.basename(project_url)
except Exception as e:
err_msg = f"Error downloading file: {e}"
logger.error(err_msg)
return err_msg, 406
elif local_path and os.path.exists(local_path):
referred_name = os.path.basename(local_path)
else:
return "Cannot find any valid project paths", 400
# prep local filepath
cleaned_path_name = os.path.splitext(referred_name)[0].replace(' ', '_')
job_dir = os.path.join(server.config['UPLOAD_FOLDER'], '-'.join(
[datetime.now().strftime("%Y.%m.%d_%H.%M.%S"), renderer, cleaned_path_name]))
os.makedirs(job_dir, exist_ok=True)
upload_dir = os.path.join(job_dir, 'source')
os.makedirs(upload_dir, exist_ok=True)
# move projects to their work directories
loaded_project_local_path = None
if uploaded_project and uploaded_project.filename:
loaded_project_local_path = os.path.join(upload_dir, secure_filename(uploaded_project.filename))
uploaded_project.save(loaded_project_local_path)
logger.info(f"Transfer complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}")
elif project_url:
loaded_project_local_path = os.path.join(upload_dir, referred_name)
shutil.move(downloaded_file_url, loaded_project_local_path)
logger.info(f"Download complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}")
elif local_path:
loaded_project_local_path = os.path.join(upload_dir, referred_name)
shutil.copy(local_path, loaded_project_local_path)
logger.info(f"Import complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}")
# process uploaded zip files
zip_path = loaded_project_local_path if loaded_project_local_path.lower().endswith('.zip') else None
if zip_path:
zip_path = loaded_project_local_path
work_path = os.path.dirname(zip_path)
try:
with zipfile.ZipFile(zip_path, 'r') as myzip:
myzip.extractall(os.path.dirname(zip_path))
project_files = [x for x in os.listdir(work_path) if os.path.isfile(os.path.join(work_path, x))]
project_files = [x for x in project_files if '.zip' not in x]
supported_exts = RenderWorkerFactory.class_for_name(renderer).engine.supported_extensions
if supported_exts:
project_files = [file for file in project_files if
any(file.endswith(ext) for ext in supported_exts)]
if len(project_files) != 1: # we have to narrow down to 1 main project file, otherwise error
return {'error': f'Cannot find valid project file in {os.path.basename(zip_path)}'}, 400
extracted_project_path = os.path.join(work_path, project_files[0])
logger.info(f"Extracted zip file to {extracted_project_path}")
loaded_project_local_path = extracted_project_path
except (zipfile.BadZipFile, zipfile.LargeZipFile) as e:
err_msg = f"Error processing zip file: {e}"
logger.error(err_msg)
return err_msg, 500
# create and add jobs to render queue
results = []
for job_data in jobs_list:
try:
# prepare output paths
output_dir = os.path.join(job_dir, job_data.get('name') if len(jobs_list) > 1 else 'output')
os.makedirs(output_dir, exist_ok=True)
# get new output path in output_dir
job_data['output_path'] = os.path.join(output_dir, os.path.basename(
job_data.get('output_path', None) or loaded_project_local_path
))
# create & configure jobs
worker = RenderWorkerFactory.create_worker(renderer=job_data['renderer'],
input_path=loaded_project_local_path,
output_path=job_data["output_path"],
args=job_data.get('args', {}))
worker.status = job_data.get("initial_status", worker.status)
worker.parent = job_data.get("parent", worker.parent)
worker.name = job_data.get("name", worker.name)
worker.priority = int(job_data.get('priority', worker.priority))
worker.start_frame = int(job_data.get("start_frame", worker.start_frame))
worker.end_frame = int(job_data.get("end_frame", worker.end_frame))
# determine if we can / should split the job
if server.config.get('enable_split_jobs', False) and (worker.total_frames > 1) and not worker.parent:
DistributedJobManager.split_into_subjobs(worker, job_data, zip_path or loaded_project_local_path)
RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False))
if not worker.parent:
make_job_ready(worker.id)
results.append(worker.json())
except Exception as e:
err_msg = f"Exception creating render job: {e}"
logger.exception(err_msg)
results.append({'error': err_msg})
# return any errors from results list
results = create_render_jobs(jobs_list, loaded_project_local_path, referred_name,
server.config['enable_split_jobs'])
for response in results:
if response.get('error', None):
return results, 400
# redirect to index if requested
if request.args.get('redirect', False):
return redirect(url_for('index'))
else:
return results, 200
except Exception as e:
logger.exception(f"Unknown error adding job: {e}")
return 'unknown error', 500
@@ -500,21 +394,17 @@ def clear_history():
def status():
renderer_data = {}
for render_class in RenderWorkerFactory.supported_classes():
if render_class.engine.default_renderer_path(): # only return renderers installed on host
if EngineManager.all_versions_for_engine(render_class.name): # only return renderers installed on host
renderer_data[render_class.engine.name()] = \
{'versions': EngineManager.all_versions_for_engine(render_class.engine.name()),
'is_available': RenderQueue.is_available_for_job(render_class.engine.name())
}
# Get system info
system_platform = platform.system().lower().replace('darwin', 'macos')
system_platform_version = platform.mac_ver()[0] if system_platform == 'macos' else platform.release().lower()
system_cpu = platform.machine().lower().replace('amd64', 'x64')
return {"timestamp": datetime.now().isoformat(),
"system_platform": system_platform,
"system_platform_version": system_platform_version,
"system_cpu": system_cpu,
"system_os": current_system_os(),
"system_os_version": current_system_os_version(),
"system_cpu": current_system_cpu(),
"cpu_percent": psutil.cpu_percent(percpu=False),
"cpu_percent_per_cpu": psutil.cpu_percent(percpu=True),
"cpu_count": psutil.cpu_count(logical=False),
@@ -533,23 +423,53 @@ def renderer_info():
renderer_data = {}
for engine_name in RenderWorkerFactory.supported_renderers():
engine = RenderWorkerFactory.class_for_name(engine_name).engine
if engine.default_renderer_path():
# Get all installed versions of engine
# Get all installed versions of engine
installed_versions = EngineManager.all_versions_for_engine(engine_name)
if installed_versions:
install_path = installed_versions[0]['path']
renderer_data[engine_name] = {'is_available': RenderQueue.is_available_for_job(engine.name()),
'versions': EngineManager.all_versions_for_engine(engine_name),
'versions': installed_versions,
'supported_extensions': engine.supported_extensions,
'supported_export_formats': engine().get_output_formats()}
'supported_export_formats': engine(install_path).get_output_formats()}
return renderer_data
@server.get('/api/<engine_name>/is_available')
def is_engine_available(engine_name):
return {'engine': engine_name, 'available': RenderQueue.is_available_for_job(engine_name),
'cpu_count': int(psutil.cpu_count(logical=False)),
'versions': EngineManager.all_versions_for_engine(engine_name),
'hostname': server.config['HOSTNAME']}
@server.get('/api/is_engine_available_to_download')
def is_engine_available_to_download():
available_result = EngineManager.version_is_available_to_download(request.args.get('engine'),
request.args.get('version'),
request.args.get('system_os'),
request.args.get('cpu'))
return available_result if available_result else \
(f"Cannot find available download for {request.args.get('engine')} {request.args.get('version')}", 500)
@server.get('/api/find_most_recent_version')
def find_most_recent_version():
most_recent = EngineManager.find_most_recent_version(request.args.get('engine'),
request.args.get('system_os'),
request.args.get('cpu'))
return most_recent if most_recent else \
(f"Error finding most recent version of {request.args.get('engine')}", 500)
@server.post('/api/download_engine')
def download_engine():
download_result = EngineManager.download_engine(request.args.get('engine'),
request.args.get('version'),
request.args.get('system_os'),
request.args.get('cpu'))
return download_result if download_result else ("Error downloading requested engine", 500)
return download_result if download_result else \
(f"Error downloading {request.args.get('engine')} {request.args.get('version')}", 500)
@server.post('/api/delete_engine')
@@ -558,7 +478,8 @@ def delete_engine_download():
request.args.get('version'),
request.args.get('system_os'),
request.args.get('cpu'))
return "Success" if delete_result else ("Error deleting requested engine", 500)
return "Success" if delete_result else \
(f"Error deleting {request.args.get('engine')} {request.args.get('version')}", 500)
@server.get('/api/renderer/<renderer>/args')
@@ -581,7 +502,7 @@ def start_server(background_thread=False):
RenderQueue.evaluate_queue()
time.sleep(delay_sec)
with open('config/config.yaml') as f:
with open(system_safe_path('config/config.yaml')) as f:
config = yaml.load(f, Loader=yaml.FullLoader)
logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S',
@@ -594,19 +515,28 @@ def start_server(background_thread=False):
# load flask settings
server.config['HOSTNAME'] = local_hostname
server.config['PORT'] = int(config.get('port_number', 8080))
server.config['UPLOAD_FOLDER'] = os.path.expanduser(config['upload_folder'])
server.config['THUMBS_FOLDER'] = os.path.join(os.path.expanduser(config['upload_folder']), 'thumbs')
server.config['UPLOAD_FOLDER'] = system_safe_path(os.path.expanduser(config['upload_folder']))
server.config['THUMBS_FOLDER'] = system_safe_path(os.path.join(os.path.expanduser(config['upload_folder']), 'thumbs'))
server.config['MAX_CONTENT_PATH'] = config['max_content_path']
server.config['enable_split_jobs'] = config.get('enable_split_jobs', False)
# Setup directory for saving engines to
EngineManager.engines_path = os.path.join(os.path.join(os.path.expanduser(config['upload_folder']), 'engines'))
EngineManager.engines_path = system_safe_path(os.path.join(os.path.join(os.path.expanduser(config['upload_folder']), 'engines')))
os.makedirs(EngineManager.engines_path, exist_ok=True)
# Debug info
logger.debug(f"Upload directory: {server.config['UPLOAD_FOLDER']}")
logger.debug(f"Thumbs directory: {server.config['THUMBS_FOLDER']}")
logger.debug(f"Engines directory: {EngineManager.engines_path}")
# disable most Flask logging
flask_log = logging.getLogger('werkzeug')
flask_log.setLevel(config.get('flask_log_level', 'ERROR').upper())
# check for updates for render engines if config'd or on first launch
if config.get('update_engines_on_launch', False) or not EngineManager.all_engines():
EngineManager.update_all_engines()
# Set up the RenderQueue object
RenderQueue.start_queue()
DistributedJobManager.start()
@@ -614,7 +544,7 @@ def start_server(background_thread=False):
thread = threading.Thread(target=eval_loop, kwargs={'delay_sec': config.get('queue_eval_seconds', 1)}, daemon=True)
thread.start()
logging.info(f"Starting Zordon Render Server - Hostname: '{server.config['HOSTNAME']}:'")
logger.info(f"Starting Zordon Render Server - Hostname: '{server.config['HOSTNAME']}:'")
ZeroconfServer.configure("_zordon._tcp.local.", server.config['HOSTNAME'], server.config['PORT'])
ZeroconfServer.start()

View File

@@ -4,6 +4,7 @@ import os
import socket
import threading
import time
from datetime import datetime
import requests
from requests_toolbelt.multipart import MultipartEncoder, MultipartEncoderMonitor
@@ -33,6 +34,9 @@ class RenderServerProxy:
self.__background_thread = None
self.__offline_flags = 0
self.update_cadence = 5
self.last_contact = datetime.now()
# to prevent errors, the last contact datetime is set to when the class is initialized - you must keep an
# instance of this class alive to accurately know the delay
def connect(self):
status = self.request_data('status')
@@ -55,6 +59,7 @@ class RenderServerProxy:
req = self.request(payload, timeout)
if req.ok and req.status_code == 200:
self.__offline_flags = 0
self.last_contact = datetime.now()
return req.json()
except json.JSONDecodeError as e:
logger.debug(f"JSON decode error: {e}")
@@ -116,6 +121,9 @@ class RenderServerProxy:
def get_status(self):
return self.request_data('status')
def is_engine_available(self, engine_name):
return self.request_data(f'{engine_name}/is_available')
def notify_parent_of_status_change(self, parent_id, subjob):
return requests.post(f'http://{self.hostname}:{self.port}/api/job/{parent_id}/notify_parent_of_status_change',
json=subjob.json())

View File

@@ -11,10 +11,10 @@ from PIL import Image, ImageTk
from src.client.new_job_window import NewJobWindow
# from src.client.server_details import create_server_popup
from src.server_proxy import RenderServerProxy
from src.api.server_proxy import RenderServerProxy
from src.utilities.misc_helper import launch_url, file_exists_in_mounts, get_time_elapsed
from src.utilities.zeroconf_server import ZeroconfServer
from src.workers.base_worker import RenderStatus
from src.engines.core.base_worker import RenderStatus
logger = logging.getLogger()

View File

@@ -11,9 +11,9 @@ from tkinter.ttk import Frame, Label, Entry, Combobox, Progressbar
import psutil
from src.server_proxy import RenderServerProxy
from src.workers.blender_worker import Blender
from src.workers.ffmpeg_worker import FFMPEG
from src.api.server_proxy import RenderServerProxy
from src.engines.blender.blender_worker import Blender
from src.engines.ffmpeg.ffmpeg_worker import FFMPEG
logger = logging.getLogger()

View File

@@ -1,13 +1,15 @@
import datetime
import logging
import os
import socket
import time
import zipfile
from plyer import notification
from pubsub import pub
from src.api.server_proxy import RenderServerProxy
from src.render_queue import RenderQueue
from src.server_proxy import RenderServerProxy
from src.utilities.misc_helper import get_file_size_human
from src.utilities.status_utils import RenderStatus, string_to_status
from src.utilities.zeroconf_server import ZeroconfServer
@@ -51,43 +53,93 @@ class DistributedJobManager:
parent_id, hostname = render_job.parent.split('@')[0], render_job.parent.split('@')[-1]
RenderServerProxy(hostname).notify_parent_of_status_change(parent_id=parent_id, subjob=render_job)
elif render_job.children and new_status == RenderStatus.CANCELLED:
# todo: handle cancelling all the children
pass
# handle cancelling all the children
elif render_job.children and new_status in [RenderStatus.CANCELLED, RenderStatus.ERROR]:
for child in render_job.children:
child_id, hostname = child.split('@')
RenderServerProxy(hostname).cancel_job(child_id, confirm=True)
# UI Notifications
try:
if new_status == RenderStatus.COMPLETED:
logger.debug("show render complete notification")
notification.notify(
title='Render Job Complete',
message=f'{render_job.name} completed succesfully',
timeout=10 # Display time in seconds
)
elif new_status == RenderStatus.ERROR:
logger.debug("show render complete notification")
notification.notify(
title='Render Job Failed',
message=f'{render_job.name} failed rendering',
timeout=10 # Display time in seconds
)
elif new_status == RenderStatus.RUNNING:
logger.debug("show render complete notification")
notification.notify(
title='Render Job Started',
message=f'{render_job.name} started rendering',
timeout=10 # Display time in seconds
)
except Exception as e:
logger.debug(f"Unable to show UI notification: {e}")
@classmethod
def handle_subjob_status_change(cls, local_job, subjob_data):
def handle_subjob_status_change(cls, parent_job_id, subjob_data):
"""
Responds to a status change from a remote subjob and triggers the creation or modification of subjobs as needed.
Parameters:
local_job (BaseRenderWorker): The local parent job worker.
subjob_data (dict): subjob data sent from remote server.
local_job_id (str): ID for local parent job worker.
subjob_data (dict): Subjob data sent from the remote server.
Returns:
None
"""
subjob_status = string_to_status(subjob_data['status'])
parent_job = RenderQueue.job_with_id(parent_job_id)
subjob_id = subjob_data['id']
subjob_hostname = next((hostname.split('@')[1] for hostname in local_job.children if
subjob_hostname = next((hostname.split('@')[1] for hostname in parent_job.children if
hostname.split('@')[0] == subjob_id), None)
local_job.children[f'{subjob_id}@{subjob_hostname}'] = subjob_data
subjob_key = f'{subjob_id}@{subjob_hostname}'
logname = f"{local_job.id}:{subjob_id}@{subjob_hostname}"
# Update the local job's subjob data
parent_job.children = dict(parent_job.children) # copy as dict to work around sqlalchemy update issue
parent_job.children[subjob_key] = subjob_data
logname = f"{parent_job_id}:{subjob_key}"
subjob_status = string_to_status(subjob_data['status'])
logger.debug(f"Subjob status changed: {logname} -> {subjob_status.value}")
# Download complete or partial render jobs
if subjob_status in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR] and \
subjob_data['file_count']:
download_result = cls.download_from_subjob(local_job, subjob_id, subjob_hostname)
if not download_result:
# todo: handle error
# Handle downloading for completed, cancelled, or error'd subjobs
if (subjob_status in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR]
and subjob_data['file_count']):
if not cls.download_from_subjob(parent_job, subjob_id, subjob_hostname):
logger.error(f"Unable to download subjob files from {logname} with status {subjob_status.value}")
# Handle cancelled or errored subjobs by determining missing frames and scheduling a new job
if subjob_status == RenderStatus.CANCELLED or subjob_status == RenderStatus.ERROR:
# todo: determine missing frames and schedule new job
pass
logger.info("Creating a new subjob")
cls.new_create_subjob(parent_job.id, socket.gethostname(),
parent_job.children[subjob_key]['start_frame'],
parent_job.children[subjob_key]['end_frame'])
# todo: determine why we don't wait for the new subjobs we create when replacing an error'd job
@staticmethod
def determine_missing_frames(parent_job_id):
"""
Determine missing frames in the subjob.
Parameters:
subjob_data (dict): Subjob data.
Returns:
list: List of missing frame numbers.
"""
# todo: Implement the logic to determine missing frames based on subjob_data
missing_frames = []
return missing_frames
@staticmethod
def download_from_subjob(local_job, subjob_id, subjob_hostname):
@@ -109,13 +161,11 @@ class DistributedJobManager:
# download zip file from server
try:
local_job.children[child_key]['download_status'] = 'working'
logger.info(f"Downloading completed subjob files from {subjob_hostname} to localhost")
RenderServerProxy(subjob_hostname).get_job_files(subjob_id, zip_file_path)
logger.info(f"File transfer complete for {logname} - Transferred {get_file_size_human(zip_file_path)}")
except Exception as e:
logger.exception(f"Exception downloading files from remote server: {e}")
local_job.children[child_key]['download_status'] = 'failed'
return False
# extract zip
@@ -126,128 +176,173 @@ class DistributedJobManager:
zip_ref.extractall(extract_path)
logger.info(f"Successfully extracted zip to: {extract_path}")
os.remove(zip_file_path)
local_job.children[child_key]['download_status'] = 'complete'
return True
except Exception as e:
logger.exception(f"Exception extracting zip file: {e}")
local_job.children[child_key]['download_status'] = 'failed'
return False
return local_job.children[child_key].get('download_status', None) == 'complete'
@classmethod
def wait_for_subjobs(cls, local_job):
logger.debug(f"Waiting for subjobs for job {local_job}")
local_job.status = RenderStatus.WAITING_FOR_SUBJOBS
statuses_to_download = [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED]
def wait_for_subjobs(cls, parent_job):
"""
Wait for subjobs to complete and update the parent job's status.
def subjobs_not_downloaded():
return {k: v for k, v in local_job.children.items() if 'download_status' not in v or
v['download_status'] == 'working' or v['download_status'] is None}
This method continuously checks the status of subjobs until all of them are either completed, canceled, or in error
status. It updates the parent job's children with the latest subjob information.
logger.info(f'Waiting on {len(subjobs_not_downloaded())} subjobs for {local_job.id}')
Parameters:
parent_job (BaseRenderWorker): The parent job worker.
while len(subjobs_not_downloaded()):
for child_key, subjob_cached_data in subjobs_not_downloaded().items():
Returns:
None
"""
logger.debug(f"Waiting for subjobs for job {parent_job}")
parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS
server_proxys = {}
subjob_id = child_key.split('@')[0]
subjob_hostname = child_key.split('@')[-1]
def fetch_subjob_info(child_key):
"""
Fetch subjob information from the remote server using a RenderServerProxy.
Parameters:
child_key (str): The key representing the subjob.
Returns:
dict: Subjob information.
"""
subjob_id, subjob_hostname = child_key.split('@')
if subjob_hostname not in server_proxys:
server_proxys[subjob_hostname] = RenderServerProxy(subjob_hostname)
return server_proxys[subjob_hostname].get_job_info(subjob_id)
while True:
incomplete_jobs = {}
for child_key in list(
parent_job.children.keys()): # Create a list to avoid dictionary modification during iteration
subjob_data = fetch_subjob_info(child_key)
# Fetch info from server and handle failing case
subjob_data = RenderServerProxy(subjob_hostname).get_job_info(subjob_id)
if not subjob_data:
logger.warning(f"No response from: {subjob_hostname}")
# todo: handle timeout / missing server situations
subjob_id, subjob_hostname = child_key.split('@')
last_connection = datetime.datetime.now() - server_proxys[subjob_hostname].last_contact
logger.warning(f"No response from: {subjob_hostname} - Last connection: {last_connection}")
last_connection_max_time = 12
if last_connection.seconds > last_connection_max_time:
logger.error(
f"{subjob_hostname} has been offline for over {last_connection_max_time} seconds - Assuming render failed")
logger.warning(f"Spinning up a new subjob to replace the offlined server")
parent_job.children[child_key]['errors'] = ['Renderer went offline']
parent_job.children[child_key]['status'] = RenderStatus.ERROR
cls.handle_subjob_status_change(parent_job_id=parent_job.id,
subjob_data=parent_job.children[child_key])
continue
# Update parent job cache but keep the download status
download_status = local_job.children[child_key].get('download_status', None)
local_job.children[child_key] = subjob_data
local_job.children[child_key]['download_status'] = download_status
parent_job.children[child_key] = subjob_data
status = string_to_status(subjob_data.get('status', ''))
status_msg = f"Subjob {child_key} | {status} | " \
f"{float(subjob_data.get('percent_complete')) * 100.0}%"
status_msg = f"Subjob {child_key} | {status} | {float(subjob_data.get('percent_complete', 0)) * 100.0}%"
logger.debug(status_msg)
# Still working in another thread - keep waiting
if download_status == 'working':
continue
if status not in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR]:
incomplete_jobs[child_key] = subjob_data
# Check if job is finished, but has not had files copied yet over yet
if download_status is None and subjob_data['file_count'] and status in statuses_to_download:
download_result = cls.download_from_subjob(local_job, subjob_id, subjob_hostname)
if not download_result:
logger.error("Failed to download from subjob")
# todo: error handling here
# Any finished jobs not successfully downloaded at this point are skipped
if local_job.children[child_key].get('download_status', None) is None and \
status in statuses_to_download:
logger.warning(f"Skipping waiting on downloading from subjob: {child_key}")
local_job.children[child_key]['download_status'] = 'skipped'
if subjobs_not_downloaded():
logger.debug(f"Waiting on {len(subjobs_not_downloaded())} subjobs on "
f"{', '.join(list(subjobs_not_downloaded().keys()))}")
time.sleep(5)
if incomplete_jobs:
logger.debug(f"Waiting on {len(incomplete_jobs)} subjobs on {', '.join(list(incomplete_jobs.keys()))}")
else:
logger.debug("No more incomplete subjobs")
if not cls.completion_hold_enabled:
break
time.sleep(5)
@classmethod
def split_into_subjobs(cls, worker, job_data, project_path):
def split_into_subjobs(cls, parent_worker, job_data, project_path):
# Check availability
available_servers = cls.find_available_servers(worker.renderer)
subjob_servers = cls.distribute_server_work(worker.start_frame, worker.end_frame, available_servers)
available_servers = cls.find_available_servers(parent_worker.renderer)
logger.debug(f"Splitting into subjobs - Available servers: {available_servers}")
subjob_frame_ranges = cls.distribute_server_work(parent_worker.start_frame, parent_worker.end_frame, available_servers)
local_hostname = socket.gethostname()
# Prep and submit these sub-jobs
logger.info(f"Job {worker.id} split plan: {subjob_servers}")
simple_ranges = [f"{x['hostname']}:[{x['frame_range'][0]}-{x['frame_range'][1]}]" for x in subjob_frame_ranges]
logger.info(f"Job {parent_worker.id} split plan: {','.join(simple_ranges)}")
try:
for server_data in subjob_servers:
server_hostname = server_data['hostname']
# setup parent render job first - truncate frames
local_range = [x for x in subjob_frame_ranges if x['hostname'] == local_hostname][0]
parent_worker.start_frame = max(local_range['frame_range'][0], parent_worker.start_frame)
parent_worker.end_frame = min(local_range['frame_range'][-1], parent_worker.end_frame)
logger.info(f"Local job now rendering from {parent_worker.start_frame} to {parent_worker.end_frame}")
RenderQueue.add_to_render_queue(parent_worker) # add range-adjusted parent to render queue
# setup remote subjobs
submission_results = {}
for subjob_server_data in subjob_frame_ranges:
server_hostname = subjob_server_data['hostname']
if server_hostname != local_hostname:
post_results = cls.__create_subjob(job_data, local_hostname, project_path, server_data,
server_hostname, worker)
post_results = cls.new_create_subjob(parent_worker.id, server_hostname,
subjob_server_data['frame_range'][0],
subjob_server_data['frame_range'][-1])
if post_results.ok:
server_data['submission_results'] = post_results.json()[0]
subjob_server_data['submission_results'] = post_results.json()[0]
else:
logger.error(f"Failed to create subjob on {server_hostname}")
break
else:
# truncate parent render_job
worker.start_frame = max(server_data['frame_range'][0], worker.start_frame)
worker.end_frame = min(server_data['frame_range'][-1], worker.end_frame)
logger.info(f"Local job now rendering from {worker.start_frame} to {worker.end_frame}")
server_data['submission_results'] = worker.json()
subjob_server_data['submission_results'] = [True]
# check that job posts were all successful.
if not all(d.get('submission_results') is not None for d in subjob_servers):
raise ValueError("Failed to create all subjobs") # look into recalculating job #s and use exising jobs
# if not all(d.get('submission_results') is not None for d in subjob_frame_ranges):
# # todo: rewrite this code - should not have to have all submissions go through
# raise ValueError("Failed to create all subjobs") # look into recalculating job #s and use exising jobs
# start subjobs
logger.debug(f"Starting {len(subjob_servers) - 1} attempted subjobs")
for server_data in subjob_servers:
if server_data['hostname'] != local_hostname:
child_key = f"{server_data['submission_results']['id']}@{server_data['hostname']}"
worker.children[child_key] = server_data['submission_results']
worker.name = f"{worker.name}[{worker.start_frame}-{worker.end_frame}]"
logger.debug(f"Starting {len(subjob_frame_ranges) - 1} attempted subjobs")
for subjob_server_data in subjob_frame_ranges:
if subjob_server_data['hostname'] != local_hostname:
child_key = f"{subjob_server_data['submission_results']['id']}@{subjob_server_data['hostname']}"
parent_worker.children[child_key] = subjob_server_data['submission_results']
parent_worker.name = f"{parent_worker.name}[{parent_worker.start_frame}-{parent_worker.end_frame}]"
except Exception as e:
# cancel all the subjobs
logger.error(f"Failed to split job into subjobs: {e}")
logger.debug(f"Cancelling {len(subjob_servers) - 1} attempted subjobs")
logger.exception(f"Failed to split job into subjobs: {e}")
logger.debug(f"Cancelling {len(subjob_frame_ranges) - 1} attempted subjobs")
# [RenderServerProxy(hostname).cancel_job(results['id'], confirm=True) for hostname, results in
# submission_results.items()] # todo: fix this
@staticmethod
def __create_subjob(job_data, local_hostname, project_path, server_data, server_hostname, worker):
subjob = job_data.copy()
subjob['name'] = f"{worker.name}[{server_data['frame_range'][0]}-{server_data['frame_range'][-1]}]"
subjob['parent'] = f"{worker.id}@{local_hostname}"
subjob['start_frame'] = server_data['frame_range'][0]
subjob['end_frame'] = server_data['frame_range'][-1]
logger.debug(f"Posting subjob with frames {subjob['start_frame']}-"
f"{subjob['end_frame']} to {server_hostname}")
post_results = RenderServerProxy(server_hostname).post_job_to_server(
file_path=project_path, job_list=[subjob])
def new_create_subjob(parent_job_id, remote_hostname, start_frame, end_frame):
"""
Create and post a subjob to a remote render server.
Parameters:
- parent_job_id (str): ID of the parent job.
- remote_hostname (str): Remote server's hostname/address.
- start_frame (int): Starting frame of the subjob.
- end_frame (int): Ending frame of the subjob.
Example:
new_create_subjob('parent_job_123', 'remote-server.example.com', 1, 100)
"""
logger.info(f"parentID: {parent_job_id}")
local_hostname = socket.gethostname()
parent_job = RenderQueue.job_with_id(parent_job_id)
subjob_data = {'renderer': parent_job.engine.name(), 'input_path': parent_job.input_path,
'args': parent_job.args, 'output_path': parent_job.output_path,
'engine_version': parent_job.renderer_version, 'start_frame': start_frame,
'end_frame': end_frame, 'parent': f"{parent_job_id}@{local_hostname}"}
logger.info(f"Creating subjob {os.path.basename(parent_job.input_path)} [{start_frame}-{end_frame}] "
f"for {remote_hostname}")
post_results = RenderServerProxy(remote_hostname).post_job_to_server(
file_path=parent_job.input_path, job_list=[subjob_data])
post_results_json = post_results.json()[0]
parent_job.children[f"{post_results_json['id']}@{remote_hostname}"] = post_results_json
return post_results
@staticmethod
@@ -323,17 +418,17 @@ class DistributedJobManager:
return server_breakdown
@staticmethod
def find_available_servers(renderer):
def find_available_servers(engine_name):
"""
Scan the Zeroconf network for currently available render servers supporting a specific renderer.
Scan the Zeroconf network for currently available render servers supporting a specific engine.
:param renderer: str, The renderer type to search for
:param engine_name: str, The engine type to search for
:return: A list of dictionaries with each dict containing hostname and cpu_count of available servers
"""
available_servers = []
for hostname in ZeroconfServer.found_clients():
response = RenderServerProxy(hostname).get_status()
if response and response.get('renderers', {}).get(renderer, {}).get('is_available', False):
available_servers.append({'hostname': hostname, 'cpu_count': int(response['cpu_count'])})
response = RenderServerProxy(hostname).is_engine_available(engine_name)
if response and response.get('available', False):
available_servers.append(response)
return available_servers

View File

@@ -1,7 +1,4 @@
try:
from .base_engine import *
except ImportError:
from base_engine import *
from src.engines.core.base_engine import BaseRenderEngine
class AERender(BaseRenderEngine):

View File

@@ -6,8 +6,8 @@ import os
import re
import time
from src.workers.base_worker import BaseRenderWorker, timecode_to_frames
from src.engines.aerender_engine import AERender
from src.engines.core.base_worker import BaseRenderWorker, timecode_to_frames
from src.engines.aerender.aerender_engine import AERender
def aerender_path():

View File

@@ -1,11 +1,10 @@
import logging
import os
import platform
import re
import requests
from .downloader_core import download_and_extract_app
from src.engines.core.downloader_core import download_and_extract_app
from src.utilities.misc_helper import current_system_os, current_system_cpu
# url = "https://download.blender.org/release/"
url = "https://ftp.nluug.nl/pub/graphics/blender/release/" # much faster mirror for testing
@@ -19,7 +18,7 @@ class BlenderDownloader:
@staticmethod
def get_major_versions():
try:
response = requests.get(url)
response = requests.get(url, timeout=5)
response.raise_for_status()
# Use regex to find all the <a> tags and extract the href attribute
@@ -36,27 +35,42 @@ class BlenderDownloader:
@staticmethod
def get_minor_versions(major_version, system_os=None, cpu=None):
base_url = url + 'Blender' + major_version
try:
base_url = url + 'Blender' + major_version
response = requests.get(base_url, timeout=5)
response.raise_for_status()
response = requests.get(base_url)
response.raise_for_status()
versions_pattern = r'<a href="(?P<file>[^"]+)">blender-(?P<version>[\d\.]+)-(?P<system_os>\w+)-(?P<cpu>\w+).*</a>'
versions_data = [match.groupdict() for match in re.finditer(versions_pattern, response.text)]
versions_pattern = r'<a href="(?P<file>[^"]+)">blender-(?P<version>[\d\.]+)-(?P<system_os>\w+)-(?P<cpu>\w+).*</a>'
versions_data = [match.groupdict() for match in re.finditer(versions_pattern, response.text)]
# Filter to just the supported formats
versions_data = [item for item in versions_data if any(item["file"].endswith(ext) for ext in supported_formats)]
# Filter to just the supported formats
versions_data = [item for item in versions_data if any(item["file"].endswith(ext) for ext in supported_formats)]
if system_os:
# Filter down OS and CPU
system_os = system_os or current_system_os()
cpu = cpu or current_system_cpu()
versions_data = [x for x in versions_data if x['system_os'] == system_os]
if cpu:
versions_data = [x for x in versions_data if x['cpu'] == cpu]
for v in versions_data:
v['url'] = os.path.join(base_url, v['file'])
for v in versions_data:
v['url'] = base_url + '/' + v['file']
versions_data = sorted(versions_data, key=lambda x: x['version'], reverse=True)
return versions_data
versions_data = sorted(versions_data, key=lambda x: x['version'], reverse=True)
return versions_data
except requests.exceptions.HTTPError as e:
logger.error(f"Invalid url: {e}")
except Exception as e:
logger.exception(e)
return []
@classmethod
def version_is_available_to_download(cls, version, system_os=None, cpu=None):
requested_major_version = '.'.join(version.split('.')[:2])
minor_versions = cls.get_minor_versions(requested_major_version, system_os, cpu)
for minor in minor_versions:
if minor['version'] == version:
return minor
return None
@staticmethod
def find_LTS_versions():
@@ -71,18 +85,18 @@ class BlenderDownloader:
return lts_versions
@classmethod
def find_most_recent_version(cls, system_os, cpu, lts_only=False):
def find_most_recent_version(cls, system_os=None, cpu=None, lts_only=False):
try:
major_version = cls.find_LTS_versions()[0] if lts_only else cls.get_major_versions()[0]
most_recent = cls.get_minor_versions(major_version, system_os, cpu)[0]
return most_recent
most_recent = cls.get_minor_versions(major_version=major_version, system_os=system_os, cpu=cpu)
return most_recent[0]
except IndexError:
logger.error("Cannot find a most recent version")
@classmethod
def download_engine(cls, version, download_location, system_os=None, cpu=None):
system_os = system_os or platform.system().lower().replace('darwin', 'macos')
cpu = cpu or platform.machine().lower().replace('amd64', 'x64')
def download_engine(cls, version, download_location, system_os=None, cpu=None, timeout=120):
system_os = system_os or current_system_os()
cpu = cpu or current_system_cpu()
try:
logger.info(f"Requesting download of blender-{version}-{system_os}-{cpu}")
@@ -90,7 +104,8 @@ class BlenderDownloader:
minor_versions = [x for x in cls.get_minor_versions(major_version, system_os, cpu) if x['version'] == version]
# we get the URL instead of calculating it ourselves. May change this
download_and_extract_app(remote_url=minor_versions[0]['url'], download_location=download_location)
download_and_extract_app(remote_url=minor_versions[0]['url'], download_location=download_location,
timeout=timeout)
except IndexError:
logger.error("Cannot find requested engine")

View File

@@ -1,10 +1,8 @@
try:
from .base_engine import *
except ImportError:
from base_engine import *
import json
import re
import logging
from src.engines.core.base_engine import *
from src.utilities.misc_helper import system_safe_path
logger = logging.getLogger()
@@ -13,6 +11,7 @@ class Blender(BaseRenderEngine):
install_paths = ['/Applications/Blender.app/Contents/MacOS/Blender']
supported_extensions = ['.blend']
binary_names = {'linux': 'blender', 'windows': 'blender.exe', 'macos': 'Blender'}
def version(self):
version = None
@@ -36,18 +35,17 @@ class Blender(BaseRenderEngine):
return subprocess.run([self.renderer_path(), '-b', project_path, '--python-expr', python_expression],
capture_output=True, timeout=timeout)
except Exception as e:
logger.warning(f"Error running python expression in blender: {e}")
pass
logger.error(f"Error running python expression in blender: {e}")
else:
raise FileNotFoundError(f'Project file not found: {project_path}')
def run_python_script(self, project_path, script_path, timeout=None):
if os.path.exists(project_path) and os.path.exists(script_path):
try:
return subprocess.run([self.default_renderer_path(), '-b', project_path, '--python', script_path],
return subprocess.run([self.renderer_path(), '-b', project_path, '--python', script_path],
capture_output=True, timeout=timeout)
except Exception as e:
logger.warning(f"Error running python expression in blender: {e}")
logger.warning(f"Error running python script in blender: {e}")
pass
elif not os.path.exists(project_path):
raise FileNotFoundError(f'Project file not found: {project_path}')
@@ -58,8 +56,8 @@ class Blender(BaseRenderEngine):
def get_scene_info(self, project_path, timeout=10):
scene_info = {}
try:
results = self.run_python_script(project_path, os.path.join(os.path.dirname(os.path.realpath(__file__)),
'scripts', 'blender', 'get_file_info.py'), timeout=timeout)
script_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'scripts', 'get_file_info.py')
results = self.run_python_script(project_path, system_safe_path(script_path), timeout=timeout)
result_text = results.stdout.decode()
for line in result_text.splitlines():
if line.startswith('SCENE_DATA:'):
@@ -76,8 +74,8 @@ class Blender(BaseRenderEngine):
# Credit to L0Lock for pack script - https://blender.stackexchange.com/a/243935
try:
logger.info(f"Starting to pack Blender file: {project_path}")
results = self.run_python_script(project_path, os.path.join(os.path.dirname(os.path.realpath(__file__)),
'scripts', 'blender', 'pack_project.py'), timeout=timeout)
script_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'scripts', 'pack_project.py')
results = self.run_python_script(project_path, system_safe_path(script_path), timeout=timeout)
result_text = results.stdout.decode()
dir_name = os.path.dirname(project_path)

View File

@@ -2,18 +2,18 @@
import re
from collections import Counter
from src.engines.blender_engine import Blender
from src.engines.blender.blender_engine import Blender
from src.utilities.ffmpeg_helper import image_sequence_to_video
from src.workers.base_worker import *
from src.engines.core.base_worker import *
class BlenderRenderWorker(BaseRenderWorker):
engine = Blender
def __init__(self, input_path, output_path, args=None, parent=None, name=None):
super(BlenderRenderWorker, self).__init__(input_path=input_path, output_path=output_path, args=args,
parent=parent, name=name)
def __init__(self, input_path, output_path, engine_path, args=None, parent=None, name=None):
super(BlenderRenderWorker, self).__init__(input_path=input_path, output_path=output_path,
engine_path=engine_path, args=args, parent=parent, name=name)
# Args
self.blender_engine = self.args.get('engine', 'BLENDER_EEVEE').upper()
@@ -24,15 +24,14 @@ class BlenderRenderWorker(BaseRenderWorker):
self.__frame_percent_complete = 0.0
# Scene Info
self.scene_info = Blender().get_scene_info(input_path)
self.scene_info = Blender(engine_path).get_scene_info(input_path)
self.start_frame = int(self.scene_info.get('start_frame', 1))
self.end_frame = int(self.scene_info.get('end_frame', self.start_frame))
self.project_length = (self.end_frame - self.start_frame) + 1
self.current_frame = -1
def generate_worker_subprocess(self):
cmd = [self.engine.default_renderer_path()]
cmd = [self.renderer_path]
if self.args.get('background', True): # optionally run render not in background
cmd.append('-b')
cmd.append(self.input_path)

View File

@@ -13,6 +13,8 @@ class BaseRenderEngine(object):
def __init__(self, custom_path=None):
self.custom_renderer_path = custom_path
if not self.renderer_path():
raise FileNotFoundError(f"Cannot find path to renderer for {self.name()} instance")
def renderer_path(self):
return self.custom_renderer_path or self.default_renderer_path()
@@ -24,9 +26,9 @@ class BaseRenderEngine(object):
@classmethod
def default_renderer_path(cls):
path = None
try:
try: # Linux and macOS
path = subprocess.check_output(['which', cls.name()], timeout=SUBPROCESS_TIMEOUT).decode('utf-8').strip()
except subprocess.CalledProcessError:
except (subprocess.CalledProcessError, FileNotFoundError):
for p in cls.install_paths:
if os.path.exists(p):
path = p

View File

@@ -30,10 +30,12 @@ class BaseRenderWorker(Base):
end_time = Column(DateTime, nullable=True)
renderer = Column(String)
renderer_version = Column(String)
renderer_path = Column(String)
priority = Column(Integer)
project_length = Column(Integer)
start_frame = Column(Integer)
end_frame = Column(Integer, nullable=True)
current_frame = Column(Integer)
parent = Column(String, nullable=True)
children = Column(JSON)
name = Column(String)
@@ -42,7 +44,7 @@ class BaseRenderWorker(Base):
engine = None
def __init__(self, input_path, output_path, priority=2, args=None, ignore_extensions=True, parent=None,
def __init__(self, input_path, output_path, engine_path, priority=2, args=None, ignore_extensions=True, parent=None,
name=None):
if not ignore_extensions:
@@ -64,7 +66,8 @@ class BaseRenderWorker(Base):
self.args = args or {}
self.date_created = datetime.now()
self.renderer = self.engine.name()
self.renderer_version = self.engine().version()
self.renderer_path = engine_path
self.renderer_version = self.engine(engine_path).version()
self.custom_renderer_path = None
self.priority = priority
self.parent = parent
@@ -73,7 +76,7 @@ class BaseRenderWorker(Base):
# Frame Ranges
self.project_length = -1
self.current_frame = 0 # should this be a 1 ?
self.current_frame = -1 # negative indicates not started
self.start_frame = 0 # should this be a 1 ?
self.end_frame = None
@@ -159,7 +162,7 @@ class BaseRenderWorker(Base):
self.errors.append(msg)
return
if not self.engine.default_renderer_path() and not self.custom_renderer_path:
if not os.path.exists(self.renderer_path):
self.status = RenderStatus.ERROR
msg = 'Cannot find render engine path for {}'.format(self.engine.name())
logger.error(msg)
@@ -168,7 +171,7 @@ class BaseRenderWorker(Base):
self.status = RenderStatus.RUNNING
self.start_time = datetime.now()
logger.info(f'Starting {self.engine.name()} {self.engine().version()} Render for {self.input_path} | '
logger.info(f'Starting {self.engine.name()} {self.renderer_version} Render for {self.input_path} | '
f'Frame Count: {self.total_frames}')
self.__thread.start()
@@ -183,7 +186,7 @@ class BaseRenderWorker(Base):
with open(self.log_path(), "a") as f:
f.write(f"{self.start_time.isoformat()} - Starting {self.engine.name()} {self.engine().version()} "
f.write(f"{self.start_time.isoformat()} - Starting {self.engine.name()} {self.renderer_version} "
f"render for {self.input_path}\n\n")
f.write(f"Running command: {subprocess_cmds}\n")
f.write('=' * 80 + '\n\n')
@@ -234,7 +237,7 @@ class BaseRenderWorker(Base):
if self.children:
from src.distributed_job_manager import DistributedJobManager
DistributedJobManager.wait_for_subjobs(local_job=self)
DistributedJobManager.wait_for_subjobs(parent_job=self)
# Post Render Work
logger.debug("Starting post-processing work")
@@ -302,7 +305,6 @@ class BaseRenderWorker(Base):
'children': self.children,
'date_created': self.date_created,
'start_time': self.start_time,
'end_time': self.end_time,
'status': self.status.value,
'file_hash': self.file_hash,
'percent_complete': self.percent_complete(),
@@ -312,6 +314,7 @@ class BaseRenderWorker(Base):
'errors': getattr(self, 'errors', None),
'start_frame': self.start_frame,
'end_frame': self.end_frame,
'current_frame': self.current_frame,
'total_frames': self.total_frames,
'last_output': getattr(self, 'last_output', None),
'log_path': self.log_path()

View File

@@ -12,7 +12,7 @@ supported_formats = ['.zip', '.tar.xz', '.dmg']
logger = logging.getLogger()
def download_and_extract_app(remote_url, download_location):
def download_and_extract_app(remote_url, download_location, timeout=120):
# Create a temp download directory
temp_download_dir = tempfile.mkdtemp()
@@ -30,7 +30,7 @@ def download_and_extract_app(remote_url, download_location):
if not os.path.exists(temp_downloaded_file_path):
# Make a GET request to the URL with stream=True to enable streaming
logger.info(f"Downloading {output_dir_name} from {remote_url}")
response = requests.get(remote_url, stream=True)
response = requests.get(remote_url, stream=True, timeout=timeout)
# Check if the request was successful
if response.status_code == 200:
@@ -54,6 +54,7 @@ def download_and_extract_app(remote_url, download_location):
logger.info(f"Successfully downloaded {os.path.basename(temp_downloaded_file_path)}")
else:
logger.error(f"Failed to download the file. Status code: {response.status_code}")
return
os.makedirs(download_location, exist_ok=True)

View File

@@ -0,0 +1,61 @@
import logging
from src.engines.engine_manager import EngineManager
logger = logging.getLogger()
class RenderWorkerFactory:
@staticmethod
def supported_classes():
# to add support for any additional RenderWorker classes, import their classes and add to list here
from src.engines.blender.blender_worker import BlenderRenderWorker
from src.engines.aerender.aerender_worker import AERenderWorker
from src.engines.ffmpeg.ffmpeg_worker import FFMPEGRenderWorker
classes = [BlenderRenderWorker, AERenderWorker, FFMPEGRenderWorker]
return classes
@staticmethod
def create_worker(renderer, input_path, output_path, engine_version=None, args=None, parent=None, name=None):
worker_class = RenderWorkerFactory.class_for_name(renderer)
# check to make sure we have versions installed
all_versions = EngineManager.all_versions_for_engine(renderer)
if not all_versions:
raise FileNotFoundError(f"Cannot find any installed {renderer} engines")
# Find the path to the requested engine version or use default
engine_path = None if engine_version else all_versions[0]['path']
if engine_version:
for ver in all_versions:
if ver['version'] == engine_version:
engine_path = ver['path']
break
# Download the required engine if not found locally
if not engine_path:
download_result = EngineManager.download_engine(renderer, engine_version)
if not download_result:
raise FileNotFoundError(f"Cannot download requested version: {renderer} {engine_version}")
engine_path = download_result['path']
logger.info("Engine downloaded. Creating worker.")
if not engine_path:
raise FileNotFoundError(f"Cannot find requested engine version {engine_version}")
return worker_class(input_path=input_path, output_path=output_path, engine_path=engine_path, args=args,
parent=parent, name=name)
@staticmethod
def supported_renderers():
return [x.engine.name() for x in RenderWorkerFactory.supported_classes()]
@staticmethod
def class_for_name(name):
name = name.lower()
for render_class in RenderWorkerFactory.supported_classes():
if render_class.engine.name() == name:
return render_class
raise LookupError(f'Cannot find class for name: {name}')

View File

@@ -1,112 +0,0 @@
import logging
import os
import platform
import re
import requests
from .downloader_core import download_and_extract_app
logger = logging.getLogger()
supported_formats = ['.zip', '.tar.xz', '.dmg']
class FFMPEGDownloader:
# macOS FFMPEG mirror maintained by Evermeet - https://evermeet.cx/ffmpeg/
macos_url = "https://evermeet.cx/pub/ffmpeg/"
# Linux FFMPEG mirror maintained by John van Sickle - https://johnvansickle.com/ffmpeg/
linux_url = "https://johnvansickle.com/ffmpeg/"
# macOS FFMPEG mirror maintained by GyanD - https://www.gyan.dev/ffmpeg/builds/
windows_download_url = "https://github.com/GyanD/codexffmpeg/releases/download/"
windows_api_url = "https://api.github.com/repos/GyanD/codexffmpeg/releases"
@classmethod
def get_macos_versions(cls):
response = requests.get(cls.macos_url)
response.raise_for_status()
link_pattern = r'>(.*\.zip)[^\.]'
link_matches = re.findall(link_pattern, response.text)
return [link.split('-')[-1].split('.zip')[0] for link in link_matches]
@classmethod
def get_linux_versions(cls):
# Link 1 / 2 - Current Version
response = requests.get(cls.linux_url)
response.raise_for_status()
current_release = re.findall(r'release: ([\w\.]+)', response.text)[0]
# Link 2 / 2 - Previous Versions
response = requests.get(os.path.join(cls.linux_url, 'old-releases'))
response.raise_for_status()
releases = list(set(re.findall(r'href="ffmpeg-([\w\.]+)-.*">ffmpeg', response.text)))
releases.sort(reverse=True)
releases.insert(0, current_release)
return releases
@classmethod
def get_windows_versions(cls):
response = requests.get(cls.windows_api_url)
response.raise_for_status()
versions = []
all_git_releases = response.json()
for item in all_git_releases:
if re.match(r'^[0-9.]+$', item['tag_name']):
versions.append(item['tag_name'])
return versions
@classmethod
def find_most_recent_version(cls, system_os, cpu, lts_only=False):
pass
@classmethod
def download_engine(cls, version, download_location, system_os=None, cpu=None):
system_os = system_os or platform.system().lower().replace('darwin', 'macos')
cpu = cpu or platform.machine().lower().replace('amd64', 'x64')
# Verify requested version is available
remote_url = None
versions_per_os = {'linux': cls.get_linux_versions, 'macos': cls.get_macos_versions, 'windows': cls.get_windows_versions}
if not versions_per_os.get(system_os):
logger.error(f"Cannot find version list for {system_os}")
return
if version not in versions_per_os[system_os]():
logger.error(f"Cannot find FFMPEG version {version} for {system_os}")
# Platform specific naming cleanup
if system_os == 'macos':
remote_url = os.path.join(cls.macos_url, f"ffmpeg-{version}.zip")
download_location = os.path.join(download_location, f'ffmpeg-{version}-{system_os}-{cpu}') # override location to match linux
elif system_os == 'linux':
release_dir = 'releases' if version == cls.get_linux_versions()[0] else 'old-releases'
remote_url = os.path.join(cls.linux_url, release_dir, f'ffmpeg-{version}-{cpu}-static.tar.xz')
elif system_os == 'windows':
remote_url = os.path.join(cls.windows_download_url, version, f'ffmpeg-{version}-full_build.zip')
# Download and extract
try:
logger.info(f"Requesting download of ffmpeg-{version}-{system_os}-{cpu}")
download_and_extract_app(remote_url=remote_url, download_location=download_location)
# naming cleanup to match existing naming convention
if system_os == 'linux':
os.rename(os.path.join(download_location, f'ffmpeg-{version}-{cpu}-static'),
os.path.join(download_location, f'ffmpeg-{version}-{system_os}-{cpu}'))
elif system_os == 'windows':
os.rename(os.path.join(download_location, f'ffmpeg-{version}-full_build'),
os.path.join(download_location, f'ffmpeg-{version}-{system_os}-{cpu}'))
except IndexError:
logger.error("Cannot download requested engine")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# print(FFMPEGDownloader.download_engine('6.0', '/Users/brett/zordon-uploads/engines/'))
print(FFMPEGDownloader.download_engine(version='6.0', download_location='/Users/brett/zordon-uploads/engines/'))

View File

@@ -1,18 +1,13 @@
import os
import logging
import platform
import os
import shutil
from .downloaders.blender_downloader import BlenderDownloader
from .downloaders.ffmpeg_downloader import FFMPEGDownloader
import threading
try:
from .blender_engine import Blender
except ImportError:
from blender_engine import Blender
try:
from .ffmpeg_engine import FFMPEG
except ImportError:
from ffmpeg_engine import FFMPEG
from src.engines.blender.blender_downloader import BlenderDownloader
from src.engines.blender.blender_engine import Blender
from src.engines.ffmpeg.ffmpeg_downloader import FFMPEGDownloader
from src.engines.ffmpeg.ffmpeg_engine import FFMPEG
from src.utilities.misc_helper import system_safe_path, current_system_os, current_system_cpu
logger = logging.getLogger()
@@ -20,6 +15,11 @@ logger = logging.getLogger()
class EngineManager:
engines_path = "~/zordon-uploads/engines"
downloader_classes = {
"blender": BlenderDownloader,
"ffmpeg": FFMPEGDownloader,
# Add more engine types and corresponding downloader classes as needed
}
@classmethod
def supported_engines(cls):
@@ -37,14 +37,25 @@ class EngineManager:
# Split the input string by dashes to get segments
segments = directory.split('-')
# Define the keys for each word
keys = ["engine", "version", "system_os", "cpu"]
# Create a dictionary with named keys
executable_names = {'linux': 'blender', 'windows': 'blender.exe', 'macos': 'Blender.app'}
keys = ["engine", "version", "system_os", "cpu"]
result_dict = {keys[i]: segments[i] for i in range(min(len(keys), len(segments)))}
result_dict['path'] = os.path.join(cls.engines_path, directory, executable_names.get(result_dict['system_os'], 'unknown'))
result_dict['type'] = 'managed'
# Figure out the binary name for the path
binary_name = result_dict['engine'].lower()
for eng in cls.supported_engines():
if eng.name().lower() == result_dict['engine']:
binary_name = eng.binary_names.get(result_dict['system_os'], binary_name)
# Find path to binary
path = None
for root, _, files in os.walk(system_safe_path(os.path.join(cls.engines_path, directory))):
if binary_name in files:
path = os.path.join(root, binary_name)
break
result_dict['path'] = path
results.append(result_dict)
except FileNotFoundError:
logger.warning("Cannot find local engines download directory")
@@ -53,8 +64,8 @@ class EngineManager:
for eng in cls.supported_engines():
if eng.default_renderer_path():
results.append({'engine': eng.name(), 'version': eng().version(),
'system_os': cls.system_os(),
'cpu': cls.system_cpu(),
'system_os': current_system_os(),
'cpu': current_system_cpu(),
'path': eng.default_renderer_path(), 'type': 'system'})
return results
@@ -65,8 +76,8 @@ class EngineManager:
@classmethod
def newest_engine_version(cls, engine, system_os=None, cpu=None):
system_os = system_os or cls.system_os()
cpu = cpu or cls.system_cpu()
system_os = system_os or current_system_os()
cpu = cpu or current_system_cpu()
try:
filtered = [x for x in cls.all_engines() if x['engine'] == engine and x['system_os'] == system_os and x['cpu'] == cpu]
@@ -77,52 +88,56 @@ class EngineManager:
return None
@classmethod
def has_engine_version(cls, engine, version, system_os=None, cpu=None):
system_os = system_os or cls.system_os()
cpu = cpu or cls.system_cpu()
def is_version_downloaded(cls, engine, version, system_os=None, cpu=None):
system_os = system_os or current_system_os()
cpu = cpu or current_system_cpu()
filtered = [x for x in cls.all_engines() if
x['engine'] == engine and x['system_os'] == system_os and x['cpu'] == cpu and x['version'] == version]
return filtered[0] if filtered else False
@staticmethod
def system_os():
return platform.system().lower().replace('darwin', 'macos')
@classmethod
def version_is_available_to_download(cls, engine, version, system_os=None, cpu=None):
try:
return cls.downloader_classes[engine].version_is_available_to_download(version=version, system_os=system_os,
cpu=cpu)
except Exception as e:
return None
@staticmethod
def system_cpu():
return platform.machine().lower().replace('amd64', 'x64')
@classmethod
def find_most_recent_version(cls, engine=None, system_os=None, cpu=None, lts_only=False):
try:
return cls.downloader_classes[engine].find_most_recent_version(system_os=system_os, cpu=cpu)
except Exception as e:
return None
@classmethod
def download_engine(cls, engine, version, system_os=None, cpu=None):
existing_download = cls.has_engine_version(engine, version, system_os, cpu)
existing_download = cls.is_version_downloaded(engine, version, system_os, cpu)
if existing_download:
logger.info(f"Requested download of {engine} {version}, but local copy already exists")
return existing_download
logger.info(f"Requesting download of {engine} {version}")
downloader_classes = {
"blender": BlenderDownloader,
"ffmpeg": FFMPEGDownloader,
# Add more engine types and corresponding downloader classes as needed
}
# Check if the provided engine type is valid
if engine not in downloader_classes:
if engine not in cls.downloader_classes:
logger.error("No valid engine found")
return
# Get the appropriate downloader class based on the engine type
downloader = downloader_classes[engine]
if downloader.download_engine(version, download_location=cls.engines_path, system_os=system_os, cpu=cpu):
return cls.has_engine_version(engine, version, system_os, cpu)
else:
cls.downloader_classes[engine].download_engine(version, download_location=cls.engines_path,
system_os=system_os, cpu=cpu, timeout=300)
# Check that engine was properly downloaded
found_engine = cls.is_version_downloaded(engine, version, system_os, cpu)
if not found_engine:
logger.error(f"Error downloading {engine}")
return found_engine
@classmethod
def delete_engine_download(cls, engine, version, system_os=None, cpu=None):
logger.info(f"Requested deletion of engine: {engine}-{version}")
found = cls.has_engine_version(engine, version, system_os, cpu)
found = cls.is_version_downloaded(engine, version, system_os, cpu)
if found:
dir_path = os.path.dirname(found['path'])
shutil.rmtree(dir_path, ignore_errors=True)
@@ -131,9 +146,32 @@ class EngineManager:
else:
logger.error(f"Cannot find engine: {engine}-{version}")
@classmethod
def update_all_engines(cls):
def engine_update_task(engine, engine_downloader):
logger.debug(f"Checking for updates to {engine}")
latest_version = engine_downloader.find_most_recent_version()
if latest_version:
logger.debug(f"Latest version of {engine} available: {latest_version.get('version')}")
if not cls.is_version_downloaded(engine, latest_version.get('version')):
logger.info(f"Downloading {engine} ({latest_version['version']})")
cls.download_engine(engine=engine, version=latest_version['version'])
else:
logger.warning(f"Unable to get latest version for {engine}")
logger.info(f"Checking for updates for render engines...")
threads = []
for engine, engine_downloader in cls.downloader_classes.items():
thread = threading.Thread(target=engine_update_task, args=(engine, engine_downloader))
threads.append(thread)
thread.start()
for thread in threads: # wait to finish
thread.join()
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# print(EngineManager.newest_engine_version('blender', 'macos', 'arm64'))
EngineManager.delete_engine_download('blender', '3.2.1', 'macos', 'a')

View File

@@ -0,0 +1,183 @@
import logging
import os
import re
import requests
from src.engines.core.downloader_core import download_and_extract_app
from src.utilities.misc_helper import current_system_cpu, current_system_os
logger = logging.getLogger()
supported_formats = ['.zip', '.tar.xz', '.dmg']
class FFMPEGDownloader:
# macOS FFMPEG mirror maintained by Evermeet - https://evermeet.cx/ffmpeg/
macos_url = "https://evermeet.cx/pub/ffmpeg/"
# Linux FFMPEG mirror maintained by John van Sickle - https://johnvansickle.com/ffmpeg/
linux_url = "https://johnvansickle.com/ffmpeg/"
# macOS FFMPEG mirror maintained by GyanD - https://www.gyan.dev/ffmpeg/builds/
windows_download_url = "https://github.com/GyanD/codexffmpeg/releases/download/"
windows_api_url = "https://api.github.com/repos/GyanD/codexffmpeg/releases"
# used to cache renderer versions in case they need to be accessed frequently
version_cache = {}
@classmethod
def __get_macos_versions(cls, use_cache=True):
# cache the versions locally
version_cache = cls.version_cache.get('macos')
if version_cache and use_cache:
return version_cache
response = requests.get(cls.macos_url, timeout=5)
response.raise_for_status()
link_pattern = r'>(.*\.zip)[^\.]'
link_matches = re.findall(link_pattern, response.text)
releases = [link.split('-')[-1].split('.zip')[0] for link in link_matches]
cls.version_cache['macos'] = releases
return releases
@classmethod
def __get_linux_versions(cls, use_cache=True):
# cache the versions locally
version_cache = cls.version_cache.get('linux')
if version_cache and use_cache:
return version_cache
# Link 1 / 2 - Current Version
response = requests.get(cls.linux_url, timeout=5)
response.raise_for_status()
current_release = re.findall(r'release: ([\w\.]+)', response.text)[0]
# Link 2 / 2 - Previous Versions
response = requests.get(os.path.join(cls.linux_url, 'old-releases'), timeout=5)
response.raise_for_status()
releases = list(set(re.findall(r'href="ffmpeg-([\w\.]+)-.*">ffmpeg', response.text)))
releases.sort(reverse=True)
releases.insert(0, current_release)
# Add to cache
cls.version_cache['linux'] = releases
return releases
@classmethod
def __get_windows_versions(cls, use_cache=True):
version_cache = cls.version_cache.get('windows')
if version_cache and use_cache:
return version_cache
response = requests.get(cls.windows_api_url, timeout=5)
response.raise_for_status()
releases = []
all_git_releases = response.json()
for item in all_git_releases:
if re.match(r'^[0-9.]+$', item['tag_name']):
releases.append(item['tag_name'])
cls.version_cache['linux'] = releases
return releases
@classmethod
def find_most_recent_version(cls, system_os=None, cpu=None, lts_only=False):
try:
system_os = system_os or current_system_os()
cpu = cpu or current_system_cpu()
return cls.all_versions(system_os, cpu)[0]
except TypeError:
pass
return None
@classmethod
def all_versions(cls, system_os=None, cpu=None):
system_os = system_os or current_system_os()
cpu = cpu or current_system_cpu()
versions_per_os = {'linux': cls.__get_linux_versions, 'macos': cls.__get_macos_versions,
'windows': cls.__get_windows_versions}
if not versions_per_os.get(system_os):
logger.error(f"Cannot find version list for {system_os}")
return
results = []
all_versions = versions_per_os[system_os]()
for version in all_versions:
remote_url = cls.__get_remote_url_for_version(version=version, system_os=system_os, cpu=cpu)
results.append({'cpu': cpu, 'file': os.path.basename(remote_url), 'system_os': system_os, 'url': remote_url,
'version': version})
return results
@classmethod
def version_is_available_to_download(cls, version, system_os=None, cpu=None):
for ver in cls.all_versions(system_os, cpu):
if ver['version'] == version:
return ver
return None
@classmethod
def __get_remote_url_for_version(cls, version, system_os, cpu):
# Platform specific naming cleanup
remote_url = None
if system_os == 'macos':
remote_url = os.path.join(cls.macos_url, f"ffmpeg-{version}.zip")
elif system_os == 'linux':
cpu = cpu.replace('x64', 'amd64') # change cpu to match repo naming convention
latest_release = (version == cls.__get_linux_versions(use_cache=True)[0])
release_dir = 'releases' if latest_release else 'old-releases'
release_filename = f'ffmpeg-release-{cpu}-static.tar.xz' if latest_release else \
f'ffmpeg-{version}-{cpu}-static.tar.xz'
remote_url = os.path.join(cls.linux_url, release_dir, release_filename)
elif system_os == 'windows':
remote_url = f"{cls.windows_download_url.strip('/')}/{version}/ffmpeg-{version}-full_build.zip"
else:
logger.error("Unknown system os")
return remote_url
@classmethod
def download_engine(cls, version, download_location, system_os=None, cpu=None, timeout=120):
system_os = system_os or current_system_os()
cpu = cpu or current_system_cpu()
# Verify requested version is available
found_version = [item for item in cls.all_versions(system_os, cpu) if item['version'] == version]
if not found_version:
logger.error(f"Cannot find FFMPEG version {version} for {system_os} and {cpu}")
return
# Platform specific naming cleanup
remote_url = cls.__get_remote_url_for_version(version=version, system_os=system_os, cpu=cpu)
if system_os == 'macos': # override location to match linux
download_location = os.path.join(download_location, f'ffmpeg-{version}-{system_os}-{cpu}')
# Download and extract
try:
logger.info(f"Requesting download of ffmpeg-{version}-{system_os}-{cpu}")
download_and_extract_app(remote_url=remote_url, download_location=download_location, timeout=timeout)
# naming cleanup to match existing naming convention
output_path = os.path.join(download_location, f'ffmpeg-{version}-{system_os}-{cpu}')
if system_os == 'linux':
initial_cpu = cpu.replace('x64', 'amd64') # change cpu to match repo naming convention
os.rename(os.path.join(download_location, f'ffmpeg-{version}-{initial_cpu}-static'), output_path)
elif system_os == 'windows':
os.rename(os.path.join(download_location, f'ffmpeg-{version}-full_build'), output_path)
return output_path
except (IndexError, FileNotFoundError) as e:
logger.error(f"Cannot download requested engine: {e}")
except OSError as e:
logger.error(f"OS error while processing engine download: {e}")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# print(FFMPEGDownloader.download_engine('6.0', '/Users/brett/zordon-uploads/engines/'))
# print(FFMPEGDownloader.find_most_recent_version(system_os='linux'))
print(FFMPEGDownloader.download_engine(version='6.0', download_location='/Users/brett/zordon-uploads/engines/', system_os='linux', cpu='x64'))

View File

@@ -1,12 +1,12 @@
try:
from .base_engine import *
except ImportError:
from base_engine import *
import re
from src.engines.core.base_engine import *
class FFMPEG(BaseRenderEngine):
binary_names = {'linux': 'ffmpeg', 'windows': 'ffmpeg.exe', 'macos': 'ffmpeg'}
def version(self):
version = None
try:
@@ -31,11 +31,15 @@ class FFMPEG(BaseRenderEngine):
return encoders
def get_all_formats(self):
formats_raw = subprocess.check_output([self.renderer_path(), '-formats'], stderr=subprocess.DEVNULL,
timeout=SUBPROCESS_TIMEOUT).decode('utf-8')
pattern = '(?P<type>[DE]{1,2})\s+(?P<id>\S{2,})\s+(?P<name>.*)'
all_formats = [m.groupdict() for m in re.finditer(pattern, formats_raw)]
return all_formats
try:
formats_raw = subprocess.check_output([self.renderer_path(), '-formats'], stderr=subprocess.DEVNULL,
timeout=SUBPROCESS_TIMEOUT).decode('utf-8')
pattern = '(?P<type>[DE]{1,2})\s+(?P<id>\S{2,})\s+(?P<name>.*)\r'
all_formats = [m.groupdict() for m in re.finditer(pattern, formats_raw)]
return all_formats
except Exception as e:
logger.error(f"Error getting all formats: {e}")
return []
def extension_for_format(self, ffmpeg_format):
# Extract the common extension using regex
@@ -53,7 +57,7 @@ class FFMPEG(BaseRenderEngine):
return [x for x in self.get_all_formats() if 'E' in x['type'].upper()]
def get_frame_count(self, path_to_file):
raw_stdout = subprocess.check_output([self.default_renderer_path(), '-i', path_to_file, '-map', '0:v:0', '-c', 'copy',
raw_stdout = subprocess.check_output([self.renderer_path(), '-i', path_to_file, '-map', '0:v:0', '-c', 'copy',
'-f', 'null', '-'], stderr=subprocess.STDOUT,
timeout=SUBPROCESS_TIMEOUT).decode('utf-8')
match = re.findall(r'frame=\s*(\d+)', raw_stdout)

View File

@@ -2,8 +2,8 @@
import re
import subprocess
from src.workers.base_worker import BaseRenderWorker
from src.engines.ffmpeg_engine import FFMPEG
from src.engines.core.base_worker import BaseRenderWorker
from src.engines.ffmpeg.ffmpeg_engine import FFMPEG
class FFMPEGRenderWorker(BaseRenderWorker):
@@ -14,12 +14,11 @@ class FFMPEGRenderWorker(BaseRenderWorker):
super(FFMPEGRenderWorker, self).__init__(input_path=input_path, output_path=output_path, args=args,
parent=parent, name=name)
stream_info = subprocess.check_output([self.engine.default_renderer_path(), "-i", # https://stackoverflow.com/a/61604105
stream_info = subprocess.check_output([self.renderer_path, "-i", # https://stackoverflow.com/a/61604105
input_path, "-map", "0:v:0", "-c", "copy", "-f", "null", "-y",
"/dev/null"], stderr=subprocess.STDOUT).decode('utf-8')
found_frames = re.findall('frame=\s*(\d+)', stream_info)
self.project_length = found_frames[-1] if found_frames else '-1'
self.current_frame = -1
def generate_worker_subprocess(self):

View File

@@ -5,8 +5,8 @@ from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from src.utilities.status_utils import RenderStatus
from src.workers.worker_factory import RenderWorkerFactory
from src.workers.base_worker import Base
from src.engines.engine_manager import EngineManager
from src.engines.core.base_worker import Base
logger = logging.getLogger()
@@ -33,10 +33,6 @@ class RenderQueue:
def start_queue(cls):
cls.load_state()
@classmethod
def job_status_change(cls, job_id, status):
logger.debug(f"Job status changed: {job_id} -> {status}")
@classmethod
def add_to_render_queue(cls, render_job, force_start=False):
logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job))
@@ -69,10 +65,12 @@ class RenderQueue:
@classmethod
def job_with_id(cls, job_id, none_ok=False):
found_job = next((x for x in cls.all_jobs() if x.id == job_id), None)
if not found_job and not none_ok:
raise JobNotFoundError(job_id)
return found_job
for job in cls.all_jobs():
if job.id == job_id:
return job
if not none_ok:
raise JobNotFoundError(f"Cannot find job with id: {job_id}")
return None
@classmethod
def clear_history(cls):
@@ -84,7 +82,7 @@ class RenderQueue:
@classmethod
def load_state(cls):
from src.workers.base_worker import BaseRenderWorker
from src.engines.core.base_worker import BaseRenderWorker
cls.job_queue = cls.session.query(BaseRenderWorker).all()
@classmethod
@@ -100,7 +98,8 @@ class RenderQueue:
@classmethod
def is_available_for_job(cls, renderer, priority=2):
if not RenderWorkerFactory.class_for_name(renderer).engine.default_renderer_path():
if not EngineManager.all_versions_for_engine(renderer):
return False
instances = cls.renderer_instances()

View File

@@ -1,12 +1,12 @@
import subprocess
from src.engines.ffmpeg_engine import FFMPEG
from src.engines.ffmpeg.ffmpeg_engine import FFMPEG
def image_sequence_to_video(source_glob_pattern, output_path, framerate=24, encoder="prores_ks", profile=4,
start_frame=1):
subprocess.run([FFMPEG.default_renderer_path(), "-framerate", str(framerate), "-start_number", str(start_frame), "-i",
f"{source_glob_pattern}", "-c:v", encoder, "-profile:v", str(profile), '-pix_fmt', 'yuva444p10le',
output_path], check=True)
output_path], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
def save_first_frame(source_path, dest_path, max_width=1280):

View File

@@ -1,5 +1,6 @@
import logging
import os
import platform
import subprocess
from datetime import datetime
@@ -103,3 +104,22 @@ def get_file_size_human(file_path):
else:
return f"{size_in_bytes / 1024 ** 4:.2f} TB"
# Convert path to the appropriate format for the current platform
def system_safe_path(path):
if platform.system().lower() == "windows":
return os.path.normpath(path)
return path.replace("\\", "/")
def current_system_os():
return platform.system().lower().replace('darwin', 'macos')
def current_system_os_version():
return platform.mac_ver()[0] if current_system_os() == 'macos' else platform.release().lower()
def current_system_cpu():
# convert all x86 64 to "x64"
return platform.machine().lower().replace('amd64', 'x64').replace('x86_64', 'x64')

View File

@@ -1,27 +0,0 @@
class RenderWorkerFactory:
@staticmethod
def supported_classes():
# to add support for any additional RenderWorker classes, import their classes and add to list here
from src.workers.blender_worker import BlenderRenderWorker
from src.workers.aerender_worker import AERenderWorker
from src.workers.ffmpeg_worker import FFMPEGRenderWorker
classes = [BlenderRenderWorker, AERenderWorker, FFMPEGRenderWorker]
return classes
@staticmethod
def create_worker(renderer, input_path, output_path, args=None, parent=None, name=None):
worker_class = RenderWorkerFactory.class_for_name(renderer)
return worker_class(input_path=input_path, output_path=output_path, args=args, parent=parent, name=name)
@staticmethod
def supported_renderers():
return [x.engine.name() for x in RenderWorkerFactory.supported_classes()]
@staticmethod
def class_for_name(name):
name = name.lower()
for render_class in RenderWorkerFactory.supported_classes():
if render_class.engine.name() == name:
return render_class
raise LookupError(f'Cannot find class for name: {name}')

View File

@@ -1,5 +1,5 @@
#!/usr/bin/env python3
from src.api_server import start_server
from src.api.api_server import start_server
if __name__ == '__main__':
start_server()