mirror of
https://github.com/blw1138/Zordon.git
synced 2025-12-17 08:48:13 +00:00
Compare commits
33 Commits
windows_pa
...
#24_genera
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
298a2ccc64 | ||
|
|
496e5f78a5 | ||
|
|
c1e5fd1129 | ||
|
|
f6073b2954 | ||
|
|
cc1d6ba452 | ||
|
|
a5e9ac0014 | ||
|
|
1d44716a1f | ||
|
|
ba81be7088 | ||
|
|
8574486443 | ||
|
|
fca2a9f441 | ||
|
|
0730b20c52 | ||
|
|
deac943e4c | ||
|
|
80ffda8447 | ||
|
|
3b975418de | ||
|
|
b646c1f848 | ||
|
|
0fe50bc175 | ||
|
|
fa0bdf807f | ||
|
|
5b102a5ea4 | ||
|
|
006a97a17a | ||
|
|
3e567060f8 | ||
|
|
7dff2e3393 | ||
|
|
0f4a9b5ddd | ||
|
|
32d863f624 | ||
|
|
760d239d0c | ||
| cc1cf92118 | |||
|
|
917a15c60c | ||
| f01192909d | |||
|
|
03e7b95e1b | ||
| 782a1a4699 | |||
| e52682c8b9 | |||
|
|
9603046432 | ||
|
|
9027cd7202 | ||
| 7a52cce40a |
@@ -1,8 +1,10 @@
|
||||
upload_folder: "~/zordon-uploads/"
|
||||
update_engines_on_launch: true
|
||||
max_content_path: 100000000
|
||||
server_log_level: info
|
||||
flask_log_level: error
|
||||
flask_debug_enable: false
|
||||
queue_eval_seconds: 1
|
||||
port_number: 8080
|
||||
enable_split_jobs: true
|
||||
enable_split_jobs: true
|
||||
download_timeout_seconds: 120
|
||||
@@ -1,4 +0,0 @@
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print(FFMPEG.get_frame_count('/Users/brett/Desktop/Big_Fire_02.mov'))
|
||||
@@ -1,4 +1,5 @@
|
||||
requests==2.31.0
|
||||
requests_toolbelt==1.0.0
|
||||
psutil==5.9.6
|
||||
PyYAML==6.0.1
|
||||
Flask==3.0.0
|
||||
@@ -11,4 +12,6 @@ Pillow==10.1.0
|
||||
zeroconf==0.119.0
|
||||
Pypubsub~=4.0.3
|
||||
tqdm==4.66.1
|
||||
dmglib==0.9.4
|
||||
dmglib==0.9.4
|
||||
plyer==2.1.0
|
||||
pyobjus==1.2.3
|
||||
184
src/api/add_job_helpers.py
Normal file
184
src/api/add_job_helpers.py
Normal file
@@ -0,0 +1,184 @@
|
||||
#!/usr/bin/env python3
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
import zipfile
|
||||
from datetime import datetime
|
||||
|
||||
import requests
|
||||
from tqdm import tqdm
|
||||
from werkzeug.utils import secure_filename
|
||||
|
||||
from src.distributed_job_manager import DistributedJobManager
|
||||
from src.engines.core.worker_factory import RenderWorkerFactory
|
||||
from src.render_queue import RenderQueue
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
|
||||
def handle_uploaded_project_files(request, jobs_list, upload_directory):
|
||||
# Initialize default values
|
||||
loaded_project_local_path = None
|
||||
|
||||
uploaded_project = request.files.get('file', None)
|
||||
project_url = jobs_list[0].get('url', None)
|
||||
local_path = jobs_list[0].get('local_path', None)
|
||||
renderer = jobs_list[0].get('renderer')
|
||||
downloaded_file_url = None
|
||||
|
||||
if uploaded_project and uploaded_project.filename:
|
||||
referred_name = os.path.basename(uploaded_project.filename)
|
||||
elif project_url:
|
||||
referred_name, downloaded_file_url = download_project_from_url(project_url)
|
||||
if not referred_name:
|
||||
raise ValueError(f"Error downloading file from URL: {project_url}")
|
||||
elif local_path and os.path.exists(local_path):
|
||||
referred_name = os.path.basename(local_path)
|
||||
|
||||
else:
|
||||
raise ValueError("Cannot find any valid project paths")
|
||||
|
||||
# Prepare the local filepath
|
||||
cleaned_path_name = os.path.splitext(referred_name)[0].replace(' ', '_')
|
||||
job_dir = os.path.join(upload_directory, '-'.join(
|
||||
[datetime.now().strftime("%Y.%m.%d_%H.%M.%S"), renderer, cleaned_path_name]))
|
||||
os.makedirs(job_dir, exist_ok=True)
|
||||
project_source_dir = os.path.join(job_dir, 'source')
|
||||
os.makedirs(project_source_dir, exist_ok=True)
|
||||
|
||||
# Move projects to their work directories
|
||||
if uploaded_project and uploaded_project.filename:
|
||||
loaded_project_local_path = os.path.join(project_source_dir, secure_filename(uploaded_project.filename))
|
||||
uploaded_project.save(loaded_project_local_path)
|
||||
logger.info(f"Transfer complete for {loaded_project_local_path.split(upload_directory)[-1]}")
|
||||
elif project_url:
|
||||
loaded_project_local_path = os.path.join(project_source_dir, referred_name)
|
||||
shutil.move(downloaded_file_url, loaded_project_local_path)
|
||||
logger.info(f"Download complete for {loaded_project_local_path.split(upload_directory)[-1]}")
|
||||
elif local_path:
|
||||
loaded_project_local_path = os.path.join(project_source_dir, referred_name)
|
||||
shutil.copy(local_path, loaded_project_local_path)
|
||||
logger.info(f"Import complete for {loaded_project_local_path.split(upload_directory)[-1]}")
|
||||
|
||||
return loaded_project_local_path, referred_name
|
||||
|
||||
|
||||
def download_project_from_url(project_url):
|
||||
# This nested function is to handle downloading from a URL
|
||||
logger.info(f"Downloading project from url: {project_url}")
|
||||
referred_name = os.path.basename(project_url)
|
||||
downloaded_file_url = None
|
||||
|
||||
try:
|
||||
response = requests.get(project_url, stream=True)
|
||||
if response.status_code == 200:
|
||||
# Get the total file size from the "Content-Length" header
|
||||
file_size = int(response.headers.get("Content-Length", 0))
|
||||
# Create a progress bar using tqdm
|
||||
progress_bar = tqdm(total=file_size, unit="B", unit_scale=True)
|
||||
# Open a file for writing in binary mode
|
||||
downloaded_file_url = os.path.join(tempfile.gettempdir(), referred_name)
|
||||
with open(downloaded_file_url, "wb") as file:
|
||||
for chunk in response.iter_content(chunk_size=1024):
|
||||
if chunk:
|
||||
# Write the chunk to the file
|
||||
file.write(chunk)
|
||||
# Update the progress bar
|
||||
progress_bar.update(len(chunk))
|
||||
# Close the progress bar
|
||||
progress_bar.close()
|
||||
else:
|
||||
return None, None
|
||||
except Exception as e:
|
||||
logger.error(f"Error downloading file: {e}")
|
||||
return None, None
|
||||
|
||||
return referred_name, downloaded_file_url
|
||||
|
||||
|
||||
def process_zipped_project(zip_path):
|
||||
# Given a zip path, extract its content, and return the main project file path
|
||||
work_path = os.path.dirname(zip_path)
|
||||
|
||||
try:
|
||||
with zipfile.ZipFile(zip_path, 'r') as myzip:
|
||||
myzip.extractall(work_path)
|
||||
|
||||
project_files = [x for x in os.listdir(work_path) if os.path.isfile(os.path.join(work_path, x))]
|
||||
project_files = [x for x in project_files if '.zip' not in x]
|
||||
|
||||
logger.debug(f"Zip files: {project_files}")
|
||||
|
||||
# supported_exts = RenderWorkerFactory.class_for_name(renderer).engine.supported_extensions
|
||||
# if supported_exts:
|
||||
# project_files = [file for file in project_files if any(file.endswith(ext) for ext in supported_exts)]
|
||||
|
||||
# If there's more than 1 project file or none, raise an error
|
||||
if len(project_files) != 1:
|
||||
raise ValueError(f'Cannot find a valid project file in {os.path.basename(zip_path)}')
|
||||
|
||||
extracted_project_path = os.path.join(work_path, project_files[0])
|
||||
logger.info(f"Extracted zip file to {extracted_project_path}")
|
||||
|
||||
except (zipfile.BadZipFile, zipfile.LargeZipFile) as e:
|
||||
logger.error(f"Error processing zip file: {e}")
|
||||
raise ValueError(f"Error processing zip file: {e}")
|
||||
return extracted_project_path
|
||||
|
||||
|
||||
def create_render_jobs(jobs_list, loaded_project_local_path, job_dir, enable_split_jobs=False):
|
||||
results = []
|
||||
|
||||
for job_data in jobs_list:
|
||||
try:
|
||||
# prepare output paths
|
||||
output_dir = os.path.join(job_dir, job_data.get('name') if len(jobs_list) > 1 else 'output')
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
# get new output path in output_dir
|
||||
output_path = job_data.get('output_path')
|
||||
if not output_path:
|
||||
loaded_project_filename = os.path.basename(loaded_project_local_path)
|
||||
output_filename = os.path.splitext(loaded_project_filename)[0]
|
||||
else:
|
||||
output_filename = os.path.basename(output_path)
|
||||
|
||||
output_path = os.path.join(os.path.dirname(os.path.dirname(loaded_project_local_path)), 'output',
|
||||
output_filename)
|
||||
logger.debug(f"New job output path: {output_path}")
|
||||
|
||||
# create & configure jobs
|
||||
worker = RenderWorkerFactory.create_worker(renderer=job_data['renderer'],
|
||||
input_path=loaded_project_local_path,
|
||||
output_path=output_path,
|
||||
engine_version=job_data.get('engine_version'),
|
||||
args=job_data.get('args', {}))
|
||||
worker.status = job_data.get("initial_status", worker.status)
|
||||
worker.parent = job_data.get("parent", worker.parent)
|
||||
worker.name = job_data.get("name", worker.name)
|
||||
worker.priority = int(job_data.get('priority', worker.priority))
|
||||
worker.start_frame = int(job_data.get("start_frame", worker.start_frame))
|
||||
worker.end_frame = int(job_data.get("end_frame", worker.end_frame))
|
||||
|
||||
# determine if we can / should split the job
|
||||
if enable_split_jobs and (worker.total_frames > 1) and not worker.parent:
|
||||
DistributedJobManager.split_into_subjobs(worker, job_data, loaded_project_local_path)
|
||||
else:
|
||||
logger.debug("Not splitting into subjobs")
|
||||
RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False))
|
||||
|
||||
if not worker.parent:
|
||||
from src.api.api_server import make_job_ready
|
||||
make_job_ready(worker.id)
|
||||
results.append(worker.json())
|
||||
except FileNotFoundError as e:
|
||||
err_msg = f"Cannot create job: {e}"
|
||||
logger.error(err_msg)
|
||||
results.append({'error': err_msg})
|
||||
except Exception as e:
|
||||
err_msg = f"Exception creating render job: {e}"
|
||||
logger.exception(err_msg)
|
||||
results.append({'error': err_msg})
|
||||
|
||||
return results
|
||||
@@ -3,31 +3,30 @@ import json
|
||||
import logging
|
||||
import os
|
||||
import pathlib
|
||||
import platform
|
||||
import shutil
|
||||
import socket
|
||||
import ssl
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
import zipfile
|
||||
from datetime import datetime
|
||||
from urllib.request import urlretrieve
|
||||
from zipfile import ZipFile
|
||||
|
||||
import json2html
|
||||
import psutil
|
||||
import yaml
|
||||
from flask import Flask, request, render_template, send_file, after_this_request, Response, redirect, url_for, abort
|
||||
from werkzeug.utils import secure_filename
|
||||
|
||||
from src.api.server_proxy import RenderServerProxy
|
||||
from src.api.add_job_helpers import handle_uploaded_project_files, process_zipped_project, create_render_jobs
|
||||
from src.distributed_job_manager import DistributedJobManager
|
||||
from src.engines.core.base_worker import string_to_status, RenderStatus
|
||||
from src.engines.core.worker_factory import RenderWorkerFactory
|
||||
from src.engines.engine_manager import EngineManager
|
||||
from src.render_queue import RenderQueue, JobNotFoundError
|
||||
from src.server_proxy import RenderServerProxy
|
||||
from src.utilities.misc_helper import system_safe_path, current_system_os, current_system_cpu, current_system_os_version
|
||||
from src.utilities.server_helper import generate_thumbnail_for_job
|
||||
from src.utilities.zeroconf_server import ZeroconfServer
|
||||
from src.workers.worker_factory import RenderWorkerFactory
|
||||
from src.workers.base_worker import string_to_status, RenderStatus
|
||||
|
||||
logger = logging.getLogger()
|
||||
server = Flask(__name__, template_folder='web/templates', static_folder='web/static')
|
||||
@@ -54,7 +53,7 @@ def sorted_jobs(all_jobs, sort_by_date=True):
|
||||
@server.route('/')
|
||||
@server.route('/index')
|
||||
def index():
|
||||
with open('config/presets.yaml') as f:
|
||||
with open(system_safe_path('config/presets.yaml')) as f:
|
||||
render_presets = yaml.load(f, Loader=yaml.FullLoader)
|
||||
|
||||
return render_template('index.html', all_jobs=sorted_jobs(RenderQueue.all_jobs()),
|
||||
@@ -131,15 +130,15 @@ def job_thumbnail(job_id):
|
||||
|
||||
# Misc status icons
|
||||
if found_job.status == RenderStatus.RUNNING:
|
||||
return send_file('web/static/images/gears.png', mimetype="image/png")
|
||||
return send_file('../web/static/images/gears.png', mimetype="image/png")
|
||||
elif found_job.status == RenderStatus.CANCELLED:
|
||||
return send_file('web/static/images/cancelled.png', mimetype="image/png")
|
||||
return send_file('../web/static/images/cancelled.png', mimetype="image/png")
|
||||
elif found_job.status == RenderStatus.SCHEDULED:
|
||||
return send_file('web/static/images/scheduled.png', mimetype="image/png")
|
||||
return send_file('../web/static/images/scheduled.png', mimetype="image/png")
|
||||
elif found_job.status == RenderStatus.NOT_STARTED:
|
||||
return send_file('web/static/images/not_started.png', mimetype="image/png")
|
||||
return send_file('../web/static/images/not_started.png', mimetype="image/png")
|
||||
# errors
|
||||
return send_file('web/static/images/error.png', mimetype="image/png")
|
||||
return send_file('../web/static/images/error.png', mimetype="image/png")
|
||||
|
||||
|
||||
# Get job file routing
|
||||
@@ -169,7 +168,7 @@ def subjob_status_change(job_id):
|
||||
try:
|
||||
subjob_details = request.json
|
||||
logger.info(f"Subjob to job id: {job_id} is now {subjob_details['status']}")
|
||||
DistributedJobManager.handle_subjob_status_change(RenderQueue.job_with_id(job_id), subjob_data=subjob_details)
|
||||
DistributedJobManager.handle_subjob_status_change(job_id, subjob_data=subjob_details)
|
||||
return Response(status=200)
|
||||
except JobNotFoundError:
|
||||
return "Job not found", 404
|
||||
@@ -188,7 +187,7 @@ def get_job_status(job_id):
|
||||
@server.get('/api/job/<job_id>/logs')
|
||||
def get_job_logs(job_id):
|
||||
found_job = RenderQueue.job_with_id(job_id)
|
||||
log_path = found_job.log_path()
|
||||
log_path = system_safe_path(found_job.log_path())
|
||||
log_data = None
|
||||
if log_path and os.path.exists(log_path):
|
||||
with open(log_path) as file:
|
||||
@@ -232,10 +231,11 @@ def download_all(job_id):
|
||||
found_job = RenderQueue.job_with_id(job_id)
|
||||
output_dir = os.path.dirname(found_job.output_path)
|
||||
if os.path.exists(output_dir):
|
||||
zip_filename = os.path.join('/tmp', pathlib.Path(found_job.input_path).stem + '.zip')
|
||||
zip_filename = system_safe_path(os.path.join(tempfile.gettempdir(),
|
||||
pathlib.Path(found_job.input_path).stem + '.zip'))
|
||||
with ZipFile(zip_filename, 'w') as zipObj:
|
||||
for f in os.listdir(output_dir):
|
||||
zipObj.write(filename=os.path.join(output_dir, f),
|
||||
zipObj.write(filename=system_safe_path(os.path.join(output_dir, f)),
|
||||
arcname=os.path.basename(f))
|
||||
return send_file(zip_filename, mimetype="zip", as_attachment=True, )
|
||||
else:
|
||||
@@ -244,7 +244,8 @@ def download_all(job_id):
|
||||
|
||||
@server.get('/api/presets')
|
||||
def presets():
|
||||
with open('config/presets.yaml') as f:
|
||||
presets_path = system_safe_path('config/presets.yaml')
|
||||
with open(presets_path) as f:
|
||||
presets = yaml.load(f, Loader=yaml.FullLoader)
|
||||
return presets
|
||||
|
||||
@@ -278,14 +279,17 @@ def detected_clients():
|
||||
return ZeroconfServer.found_clients()
|
||||
|
||||
|
||||
# New version
|
||||
@server.post('/api/add_job')
|
||||
def add_job_handler():
|
||||
# initial handling of raw data
|
||||
# Process request data
|
||||
try:
|
||||
if request.is_json:
|
||||
jobs_list = [request.json] if not isinstance(request.json, list) else request.json
|
||||
logger.debug(f"Received add_job JSON: {jobs_list}")
|
||||
elif request.form.get('json', None):
|
||||
jobs_list = json.loads(request.form['json'])
|
||||
logger.debug(f"Received add_job form: {jobs_list}")
|
||||
else:
|
||||
# Cleanup flat form data into nested structure
|
||||
form_dict = {k: v for k, v in dict(request.form).items() if v}
|
||||
@@ -299,137 +303,27 @@ def add_job_handler():
|
||||
args['raw'] = form_dict.get('raw_args', None)
|
||||
form_dict['args'] = args
|
||||
jobs_list = [form_dict]
|
||||
logger.debug(f"Received add_job data: {jobs_list}")
|
||||
except Exception as e:
|
||||
err_msg = f"Error processing job data: {e}"
|
||||
logger.error(err_msg)
|
||||
return err_msg, 500
|
||||
|
||||
# start handling project files
|
||||
try:
|
||||
# handle uploaded files
|
||||
logger.debug(f"Incoming new job request: {jobs_list}")
|
||||
uploaded_project = request.files.get('file', None)
|
||||
project_url = jobs_list[0].get('url', None)
|
||||
local_path = jobs_list[0].get('local_path', None)
|
||||
renderer = jobs_list[0].get('renderer')
|
||||
loaded_project_local_path, referred_name = handle_uploaded_project_files(request, jobs_list,
|
||||
server.config['UPLOAD_FOLDER'])
|
||||
if loaded_project_local_path.lower().endswith('.zip'):
|
||||
loaded_project_local_path = process_zipped_project(loaded_project_local_path)
|
||||
|
||||
downloaded_file_url = None
|
||||
if uploaded_project and uploaded_project.filename:
|
||||
referred_name = os.path.basename(uploaded_project.filename)
|
||||
elif project_url:
|
||||
# download and save url - have to download first to know filename due to redirects
|
||||
logger.info(f"Attempting to download URL: {project_url}")
|
||||
try:
|
||||
downloaded_file_url, info = urlretrieve(project_url)
|
||||
referred_name = info.get_filename() or os.path.basename(project_url)
|
||||
except Exception as e:
|
||||
err_msg = f"Error downloading file: {e}"
|
||||
logger.error(err_msg)
|
||||
return err_msg, 406
|
||||
elif local_path and os.path.exists(local_path):
|
||||
referred_name = os.path.basename(local_path)
|
||||
else:
|
||||
return "Cannot find any valid project paths", 400
|
||||
|
||||
# prep local filepath
|
||||
cleaned_path_name = os.path.splitext(referred_name)[0].replace(' ', '_')
|
||||
job_dir = os.path.join(server.config['UPLOAD_FOLDER'], '-'.join(
|
||||
[datetime.now().strftime("%Y.%m.%d_%H.%M.%S"), renderer, cleaned_path_name]))
|
||||
os.makedirs(job_dir, exist_ok=True)
|
||||
upload_dir = os.path.join(job_dir, 'source')
|
||||
os.makedirs(upload_dir, exist_ok=True)
|
||||
|
||||
# move projects to their work directories
|
||||
loaded_project_local_path = None
|
||||
if uploaded_project and uploaded_project.filename:
|
||||
loaded_project_local_path = os.path.join(upload_dir, secure_filename(uploaded_project.filename))
|
||||
uploaded_project.save(loaded_project_local_path)
|
||||
logger.info(f"Transfer complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}")
|
||||
elif project_url:
|
||||
loaded_project_local_path = os.path.join(upload_dir, referred_name)
|
||||
shutil.move(downloaded_file_url, loaded_project_local_path)
|
||||
logger.info(f"Download complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}")
|
||||
elif local_path:
|
||||
loaded_project_local_path = os.path.join(upload_dir, referred_name)
|
||||
shutil.copy(local_path, loaded_project_local_path)
|
||||
logger.info(f"Import complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}")
|
||||
|
||||
# process uploaded zip files
|
||||
zip_path = loaded_project_local_path if loaded_project_local_path.lower().endswith('.zip') else None
|
||||
if zip_path:
|
||||
zip_path = loaded_project_local_path
|
||||
work_path = os.path.dirname(zip_path)
|
||||
try:
|
||||
with zipfile.ZipFile(zip_path, 'r') as myzip:
|
||||
myzip.extractall(os.path.dirname(zip_path))
|
||||
|
||||
project_files = [x for x in os.listdir(work_path) if os.path.isfile(os.path.join(work_path, x))]
|
||||
project_files = [x for x in project_files if '.zip' not in x]
|
||||
supported_exts = RenderWorkerFactory.class_for_name(renderer).engine.supported_extensions
|
||||
if supported_exts:
|
||||
project_files = [file for file in project_files if
|
||||
any(file.endswith(ext) for ext in supported_exts)]
|
||||
|
||||
if len(project_files) != 1: # we have to narrow down to 1 main project file, otherwise error
|
||||
return {'error': f'Cannot find valid project file in {os.path.basename(zip_path)}'}, 400
|
||||
|
||||
extracted_project_path = os.path.join(work_path, project_files[0])
|
||||
logger.info(f"Extracted zip file to {extracted_project_path}")
|
||||
loaded_project_local_path = extracted_project_path
|
||||
except (zipfile.BadZipFile, zipfile.LargeZipFile) as e:
|
||||
err_msg = f"Error processing zip file: {e}"
|
||||
logger.error(err_msg)
|
||||
return err_msg, 500
|
||||
|
||||
# create and add jobs to render queue
|
||||
results = []
|
||||
for job_data in jobs_list:
|
||||
try:
|
||||
# prepare output paths
|
||||
output_dir = os.path.join(job_dir, job_data.get('name') if len(jobs_list) > 1 else 'output')
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
# get new output path in output_dir
|
||||
job_data['output_path'] = os.path.join(output_dir, os.path.basename(
|
||||
job_data.get('output_path', None) or loaded_project_local_path
|
||||
))
|
||||
|
||||
# create & configure jobs
|
||||
worker = RenderWorkerFactory.create_worker(renderer=job_data['renderer'],
|
||||
input_path=loaded_project_local_path,
|
||||
output_path=job_data["output_path"],
|
||||
args=job_data.get('args', {}))
|
||||
worker.status = job_data.get("initial_status", worker.status)
|
||||
worker.parent = job_data.get("parent", worker.parent)
|
||||
worker.name = job_data.get("name", worker.name)
|
||||
worker.priority = int(job_data.get('priority', worker.priority))
|
||||
worker.start_frame = int(job_data.get("start_frame", worker.start_frame))
|
||||
worker.end_frame = int(job_data.get("end_frame", worker.end_frame))
|
||||
|
||||
# determine if we can / should split the job
|
||||
if server.config.get('enable_split_jobs', False) and (worker.total_frames > 1) and not worker.parent:
|
||||
DistributedJobManager.split_into_subjobs(worker, job_data, zip_path or loaded_project_local_path)
|
||||
|
||||
RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False))
|
||||
if not worker.parent:
|
||||
make_job_ready(worker.id)
|
||||
results.append(worker.json())
|
||||
except Exception as e:
|
||||
err_msg = f"Exception creating render job: {e}"
|
||||
logger.exception(err_msg)
|
||||
results.append({'error': err_msg})
|
||||
|
||||
# return any errors from results list
|
||||
results = create_render_jobs(jobs_list, loaded_project_local_path, referred_name,
|
||||
server.config['enable_split_jobs'])
|
||||
for response in results:
|
||||
if response.get('error', None):
|
||||
return results, 400
|
||||
|
||||
# redirect to index if requested
|
||||
if request.args.get('redirect', False):
|
||||
return redirect(url_for('index'))
|
||||
else:
|
||||
return results, 200
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Unknown error adding job: {e}")
|
||||
return 'unknown error', 500
|
||||
@@ -500,21 +394,17 @@ def clear_history():
|
||||
def status():
|
||||
renderer_data = {}
|
||||
for render_class in RenderWorkerFactory.supported_classes():
|
||||
if render_class.engine.default_renderer_path(): # only return renderers installed on host
|
||||
if EngineManager.all_versions_for_engine(render_class.name): # only return renderers installed on host
|
||||
renderer_data[render_class.engine.name()] = \
|
||||
{'versions': EngineManager.all_versions_for_engine(render_class.engine.name()),
|
||||
'is_available': RenderQueue.is_available_for_job(render_class.engine.name())
|
||||
}
|
||||
|
||||
# Get system info
|
||||
system_platform = platform.system().lower().replace('darwin', 'macos')
|
||||
system_platform_version = platform.mac_ver()[0] if system_platform == 'macos' else platform.release().lower()
|
||||
system_cpu = platform.machine().lower().replace('amd64', 'x64')
|
||||
|
||||
return {"timestamp": datetime.now().isoformat(),
|
||||
"system_platform": system_platform,
|
||||
"system_platform_version": system_platform_version,
|
||||
"system_cpu": system_cpu,
|
||||
"system_os": current_system_os(),
|
||||
"system_os_version": current_system_os_version(),
|
||||
"system_cpu": current_system_cpu(),
|
||||
"cpu_percent": psutil.cpu_percent(percpu=False),
|
||||
"cpu_percent_per_cpu": psutil.cpu_percent(percpu=True),
|
||||
"cpu_count": psutil.cpu_count(logical=False),
|
||||
@@ -533,23 +423,53 @@ def renderer_info():
|
||||
renderer_data = {}
|
||||
for engine_name in RenderWorkerFactory.supported_renderers():
|
||||
engine = RenderWorkerFactory.class_for_name(engine_name).engine
|
||||
if engine.default_renderer_path():
|
||||
|
||||
# Get all installed versions of engine
|
||||
# Get all installed versions of engine
|
||||
installed_versions = EngineManager.all_versions_for_engine(engine_name)
|
||||
if installed_versions:
|
||||
install_path = installed_versions[0]['path']
|
||||
renderer_data[engine_name] = {'is_available': RenderQueue.is_available_for_job(engine.name()),
|
||||
'versions': EngineManager.all_versions_for_engine(engine_name),
|
||||
'versions': installed_versions,
|
||||
'supported_extensions': engine.supported_extensions,
|
||||
'supported_export_formats': engine().get_output_formats()}
|
||||
'supported_export_formats': engine(install_path).get_output_formats()}
|
||||
return renderer_data
|
||||
|
||||
|
||||
@server.get('/api/<engine_name>/is_available')
|
||||
def is_engine_available(engine_name):
|
||||
return {'engine': engine_name, 'available': RenderQueue.is_available_for_job(engine_name),
|
||||
'cpu_count': int(psutil.cpu_count(logical=False)),
|
||||
'versions': EngineManager.all_versions_for_engine(engine_name),
|
||||
'hostname': server.config['HOSTNAME']}
|
||||
|
||||
|
||||
@server.get('/api/is_engine_available_to_download')
|
||||
def is_engine_available_to_download():
|
||||
available_result = EngineManager.version_is_available_to_download(request.args.get('engine'),
|
||||
request.args.get('version'),
|
||||
request.args.get('system_os'),
|
||||
request.args.get('cpu'))
|
||||
return available_result if available_result else \
|
||||
(f"Cannot find available download for {request.args.get('engine')} {request.args.get('version')}", 500)
|
||||
|
||||
|
||||
@server.get('/api/find_most_recent_version')
|
||||
def find_most_recent_version():
|
||||
most_recent = EngineManager.find_most_recent_version(request.args.get('engine'),
|
||||
request.args.get('system_os'),
|
||||
request.args.get('cpu'))
|
||||
return most_recent if most_recent else \
|
||||
(f"Error finding most recent version of {request.args.get('engine')}", 500)
|
||||
|
||||
|
||||
@server.post('/api/download_engine')
|
||||
def download_engine():
|
||||
download_result = EngineManager.download_engine(request.args.get('engine'),
|
||||
request.args.get('version'),
|
||||
request.args.get('system_os'),
|
||||
request.args.get('cpu'))
|
||||
return download_result if download_result else ("Error downloading requested engine", 500)
|
||||
return download_result if download_result else \
|
||||
(f"Error downloading {request.args.get('engine')} {request.args.get('version')}", 500)
|
||||
|
||||
|
||||
@server.post('/api/delete_engine')
|
||||
@@ -558,7 +478,8 @@ def delete_engine_download():
|
||||
request.args.get('version'),
|
||||
request.args.get('system_os'),
|
||||
request.args.get('cpu'))
|
||||
return "Success" if delete_result else ("Error deleting requested engine", 500)
|
||||
return "Success" if delete_result else \
|
||||
(f"Error deleting {request.args.get('engine')} {request.args.get('version')}", 500)
|
||||
|
||||
|
||||
@server.get('/api/renderer/<renderer>/args')
|
||||
@@ -581,7 +502,7 @@ def start_server(background_thread=False):
|
||||
RenderQueue.evaluate_queue()
|
||||
time.sleep(delay_sec)
|
||||
|
||||
with open('config/config.yaml') as f:
|
||||
with open(system_safe_path('config/config.yaml')) as f:
|
||||
config = yaml.load(f, Loader=yaml.FullLoader)
|
||||
|
||||
logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S',
|
||||
@@ -594,19 +515,28 @@ def start_server(background_thread=False):
|
||||
# load flask settings
|
||||
server.config['HOSTNAME'] = local_hostname
|
||||
server.config['PORT'] = int(config.get('port_number', 8080))
|
||||
server.config['UPLOAD_FOLDER'] = os.path.expanduser(config['upload_folder'])
|
||||
server.config['THUMBS_FOLDER'] = os.path.join(os.path.expanduser(config['upload_folder']), 'thumbs')
|
||||
server.config['UPLOAD_FOLDER'] = system_safe_path(os.path.expanduser(config['upload_folder']))
|
||||
server.config['THUMBS_FOLDER'] = system_safe_path(os.path.join(os.path.expanduser(config['upload_folder']), 'thumbs'))
|
||||
server.config['MAX_CONTENT_PATH'] = config['max_content_path']
|
||||
server.config['enable_split_jobs'] = config.get('enable_split_jobs', False)
|
||||
|
||||
# Setup directory for saving engines to
|
||||
EngineManager.engines_path = os.path.join(os.path.join(os.path.expanduser(config['upload_folder']), 'engines'))
|
||||
EngineManager.engines_path = system_safe_path(os.path.join(os.path.join(os.path.expanduser(config['upload_folder']), 'engines')))
|
||||
os.makedirs(EngineManager.engines_path, exist_ok=True)
|
||||
|
||||
# Debug info
|
||||
logger.debug(f"Upload directory: {server.config['UPLOAD_FOLDER']}")
|
||||
logger.debug(f"Thumbs directory: {server.config['THUMBS_FOLDER']}")
|
||||
logger.debug(f"Engines directory: {EngineManager.engines_path}")
|
||||
|
||||
# disable most Flask logging
|
||||
flask_log = logging.getLogger('werkzeug')
|
||||
flask_log.setLevel(config.get('flask_log_level', 'ERROR').upper())
|
||||
|
||||
# check for updates for render engines if config'd or on first launch
|
||||
if config.get('update_engines_on_launch', False) or not EngineManager.all_engines():
|
||||
EngineManager.update_all_engines()
|
||||
|
||||
# Set up the RenderQueue object
|
||||
RenderQueue.start_queue()
|
||||
DistributedJobManager.start()
|
||||
@@ -614,7 +544,7 @@ def start_server(background_thread=False):
|
||||
thread = threading.Thread(target=eval_loop, kwargs={'delay_sec': config.get('queue_eval_seconds', 1)}, daemon=True)
|
||||
thread.start()
|
||||
|
||||
logging.info(f"Starting Zordon Render Server - Hostname: '{server.config['HOSTNAME']}:'")
|
||||
logger.info(f"Starting Zordon Render Server - Hostname: '{server.config['HOSTNAME']}:'")
|
||||
ZeroconfServer.configure("_zordon._tcp.local.", server.config['HOSTNAME'], server.config['PORT'])
|
||||
ZeroconfServer.start()
|
||||
|
||||
@@ -4,6 +4,7 @@ import os
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime
|
||||
|
||||
import requests
|
||||
from requests_toolbelt.multipart import MultipartEncoder, MultipartEncoderMonitor
|
||||
@@ -33,6 +34,9 @@ class RenderServerProxy:
|
||||
self.__background_thread = None
|
||||
self.__offline_flags = 0
|
||||
self.update_cadence = 5
|
||||
self.last_contact = datetime.now()
|
||||
# to prevent errors, the last contact datetime is set to when the class is initialized - you must keep an
|
||||
# instance of this class alive to accurately know the delay
|
||||
|
||||
def connect(self):
|
||||
status = self.request_data('status')
|
||||
@@ -55,6 +59,7 @@ class RenderServerProxy:
|
||||
req = self.request(payload, timeout)
|
||||
if req.ok and req.status_code == 200:
|
||||
self.__offline_flags = 0
|
||||
self.last_contact = datetime.now()
|
||||
return req.json()
|
||||
except json.JSONDecodeError as e:
|
||||
logger.debug(f"JSON decode error: {e}")
|
||||
@@ -116,6 +121,9 @@ class RenderServerProxy:
|
||||
def get_status(self):
|
||||
return self.request_data('status')
|
||||
|
||||
def is_engine_available(self, engine_name):
|
||||
return self.request_data(f'{engine_name}/is_available')
|
||||
|
||||
def notify_parent_of_status_change(self, parent_id, subjob):
|
||||
return requests.post(f'http://{self.hostname}:{self.port}/api/job/{parent_id}/notify_parent_of_status_change',
|
||||
json=subjob.json())
|
||||
@@ -11,10 +11,10 @@ from PIL import Image, ImageTk
|
||||
|
||||
from src.client.new_job_window import NewJobWindow
|
||||
# from src.client.server_details import create_server_popup
|
||||
from src.server_proxy import RenderServerProxy
|
||||
from src.api.server_proxy import RenderServerProxy
|
||||
from src.utilities.misc_helper import launch_url, file_exists_in_mounts, get_time_elapsed
|
||||
from src.utilities.zeroconf_server import ZeroconfServer
|
||||
from src.workers.base_worker import RenderStatus
|
||||
from src.engines.core.base_worker import RenderStatus
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
|
||||
@@ -11,9 +11,9 @@ from tkinter.ttk import Frame, Label, Entry, Combobox, Progressbar
|
||||
|
||||
import psutil
|
||||
|
||||
from src.server_proxy import RenderServerProxy
|
||||
from src.workers.blender_worker import Blender
|
||||
from src.workers.ffmpeg_worker import FFMPEG
|
||||
from src.api.server_proxy import RenderServerProxy
|
||||
from src.engines.blender.blender_worker import Blender
|
||||
from src.engines.ffmpeg.ffmpeg_worker import FFMPEG
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
|
||||
@@ -1,13 +1,15 @@
|
||||
import datetime
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
import time
|
||||
import zipfile
|
||||
|
||||
from plyer import notification
|
||||
from pubsub import pub
|
||||
|
||||
from src.api.server_proxy import RenderServerProxy
|
||||
from src.render_queue import RenderQueue
|
||||
from src.server_proxy import RenderServerProxy
|
||||
from src.utilities.misc_helper import get_file_size_human
|
||||
from src.utilities.status_utils import RenderStatus, string_to_status
|
||||
from src.utilities.zeroconf_server import ZeroconfServer
|
||||
@@ -51,43 +53,93 @@ class DistributedJobManager:
|
||||
parent_id, hostname = render_job.parent.split('@')[0], render_job.parent.split('@')[-1]
|
||||
RenderServerProxy(hostname).notify_parent_of_status_change(parent_id=parent_id, subjob=render_job)
|
||||
|
||||
elif render_job.children and new_status == RenderStatus.CANCELLED:
|
||||
# todo: handle cancelling all the children
|
||||
pass
|
||||
# handle cancelling all the children
|
||||
elif render_job.children and new_status in [RenderStatus.CANCELLED, RenderStatus.ERROR]:
|
||||
for child in render_job.children:
|
||||
child_id, hostname = child.split('@')
|
||||
RenderServerProxy(hostname).cancel_job(child_id, confirm=True)
|
||||
|
||||
# UI Notifications
|
||||
try:
|
||||
if new_status == RenderStatus.COMPLETED:
|
||||
logger.debug("show render complete notification")
|
||||
notification.notify(
|
||||
title='Render Job Complete',
|
||||
message=f'{render_job.name} completed succesfully',
|
||||
timeout=10 # Display time in seconds
|
||||
)
|
||||
elif new_status == RenderStatus.ERROR:
|
||||
logger.debug("show render complete notification")
|
||||
notification.notify(
|
||||
title='Render Job Failed',
|
||||
message=f'{render_job.name} failed rendering',
|
||||
timeout=10 # Display time in seconds
|
||||
)
|
||||
elif new_status == RenderStatus.RUNNING:
|
||||
logger.debug("show render complete notification")
|
||||
notification.notify(
|
||||
title='Render Job Started',
|
||||
message=f'{render_job.name} started rendering',
|
||||
timeout=10 # Display time in seconds
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug(f"Unable to show UI notification: {e}")
|
||||
|
||||
@classmethod
|
||||
def handle_subjob_status_change(cls, local_job, subjob_data):
|
||||
def handle_subjob_status_change(cls, parent_job_id, subjob_data):
|
||||
"""
|
||||
Responds to a status change from a remote subjob and triggers the creation or modification of subjobs as needed.
|
||||
|
||||
Parameters:
|
||||
local_job (BaseRenderWorker): The local parent job worker.
|
||||
subjob_data (dict): subjob data sent from remote server.
|
||||
local_job_id (str): ID for local parent job worker.
|
||||
subjob_data (dict): Subjob data sent from the remote server.
|
||||
|
||||
Returns:
|
||||
None
|
||||
"""
|
||||
|
||||
subjob_status = string_to_status(subjob_data['status'])
|
||||
parent_job = RenderQueue.job_with_id(parent_job_id)
|
||||
subjob_id = subjob_data['id']
|
||||
subjob_hostname = next((hostname.split('@')[1] for hostname in local_job.children if
|
||||
subjob_hostname = next((hostname.split('@')[1] for hostname in parent_job.children if
|
||||
hostname.split('@')[0] == subjob_id), None)
|
||||
local_job.children[f'{subjob_id}@{subjob_hostname}'] = subjob_data
|
||||
subjob_key = f'{subjob_id}@{subjob_hostname}'
|
||||
|
||||
logname = f"{local_job.id}:{subjob_id}@{subjob_hostname}"
|
||||
# Update the local job's subjob data
|
||||
parent_job.children = dict(parent_job.children) # copy as dict to work around sqlalchemy update issue
|
||||
parent_job.children[subjob_key] = subjob_data
|
||||
|
||||
logname = f"{parent_job_id}:{subjob_key}"
|
||||
subjob_status = string_to_status(subjob_data['status'])
|
||||
logger.debug(f"Subjob status changed: {logname} -> {subjob_status.value}")
|
||||
|
||||
# Download complete or partial render jobs
|
||||
if subjob_status in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR] and \
|
||||
subjob_data['file_count']:
|
||||
download_result = cls.download_from_subjob(local_job, subjob_id, subjob_hostname)
|
||||
if not download_result:
|
||||
# todo: handle error
|
||||
# Handle downloading for completed, cancelled, or error'd subjobs
|
||||
if (subjob_status in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR]
|
||||
and subjob_data['file_count']):
|
||||
if not cls.download_from_subjob(parent_job, subjob_id, subjob_hostname):
|
||||
logger.error(f"Unable to download subjob files from {logname} with status {subjob_status.value}")
|
||||
|
||||
# Handle cancelled or errored subjobs by determining missing frames and scheduling a new job
|
||||
if subjob_status == RenderStatus.CANCELLED or subjob_status == RenderStatus.ERROR:
|
||||
# todo: determine missing frames and schedule new job
|
||||
pass
|
||||
logger.info("Creating a new subjob")
|
||||
cls.new_create_subjob(parent_job.id, socket.gethostname(),
|
||||
parent_job.children[subjob_key]['start_frame'],
|
||||
parent_job.children[subjob_key]['end_frame'])
|
||||
# todo: determine why we don't wait for the new subjobs we create when replacing an error'd job
|
||||
|
||||
@staticmethod
|
||||
def determine_missing_frames(parent_job_id):
|
||||
"""
|
||||
Determine missing frames in the subjob.
|
||||
|
||||
Parameters:
|
||||
subjob_data (dict): Subjob data.
|
||||
|
||||
Returns:
|
||||
list: List of missing frame numbers.
|
||||
"""
|
||||
# todo: Implement the logic to determine missing frames based on subjob_data
|
||||
missing_frames = []
|
||||
return missing_frames
|
||||
|
||||
|
||||
@staticmethod
|
||||
def download_from_subjob(local_job, subjob_id, subjob_hostname):
|
||||
@@ -109,13 +161,11 @@ class DistributedJobManager:
|
||||
|
||||
# download zip file from server
|
||||
try:
|
||||
local_job.children[child_key]['download_status'] = 'working'
|
||||
logger.info(f"Downloading completed subjob files from {subjob_hostname} to localhost")
|
||||
RenderServerProxy(subjob_hostname).get_job_files(subjob_id, zip_file_path)
|
||||
logger.info(f"File transfer complete for {logname} - Transferred {get_file_size_human(zip_file_path)}")
|
||||
except Exception as e:
|
||||
logger.exception(f"Exception downloading files from remote server: {e}")
|
||||
local_job.children[child_key]['download_status'] = 'failed'
|
||||
return False
|
||||
|
||||
# extract zip
|
||||
@@ -126,128 +176,173 @@ class DistributedJobManager:
|
||||
zip_ref.extractall(extract_path)
|
||||
logger.info(f"Successfully extracted zip to: {extract_path}")
|
||||
os.remove(zip_file_path)
|
||||
local_job.children[child_key]['download_status'] = 'complete'
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.exception(f"Exception extracting zip file: {e}")
|
||||
local_job.children[child_key]['download_status'] = 'failed'
|
||||
return False
|
||||
|
||||
return local_job.children[child_key].get('download_status', None) == 'complete'
|
||||
|
||||
@classmethod
|
||||
def wait_for_subjobs(cls, local_job):
|
||||
logger.debug(f"Waiting for subjobs for job {local_job}")
|
||||
local_job.status = RenderStatus.WAITING_FOR_SUBJOBS
|
||||
statuses_to_download = [RenderStatus.CANCELLED, RenderStatus.ERROR, RenderStatus.COMPLETED]
|
||||
def wait_for_subjobs(cls, parent_job):
|
||||
"""
|
||||
Wait for subjobs to complete and update the parent job's status.
|
||||
|
||||
def subjobs_not_downloaded():
|
||||
return {k: v for k, v in local_job.children.items() if 'download_status' not in v or
|
||||
v['download_status'] == 'working' or v['download_status'] is None}
|
||||
This method continuously checks the status of subjobs until all of them are either completed, canceled, or in error
|
||||
status. It updates the parent job's children with the latest subjob information.
|
||||
|
||||
logger.info(f'Waiting on {len(subjobs_not_downloaded())} subjobs for {local_job.id}')
|
||||
Parameters:
|
||||
parent_job (BaseRenderWorker): The parent job worker.
|
||||
|
||||
while len(subjobs_not_downloaded()):
|
||||
for child_key, subjob_cached_data in subjobs_not_downloaded().items():
|
||||
Returns:
|
||||
None
|
||||
"""
|
||||
logger.debug(f"Waiting for subjobs for job {parent_job}")
|
||||
parent_job.status = RenderStatus.WAITING_FOR_SUBJOBS
|
||||
server_proxys = {}
|
||||
|
||||
subjob_id = child_key.split('@')[0]
|
||||
subjob_hostname = child_key.split('@')[-1]
|
||||
def fetch_subjob_info(child_key):
|
||||
"""
|
||||
Fetch subjob information from the remote server using a RenderServerProxy.
|
||||
|
||||
Parameters:
|
||||
child_key (str): The key representing the subjob.
|
||||
|
||||
Returns:
|
||||
dict: Subjob information.
|
||||
"""
|
||||
subjob_id, subjob_hostname = child_key.split('@')
|
||||
if subjob_hostname not in server_proxys:
|
||||
server_proxys[subjob_hostname] = RenderServerProxy(subjob_hostname)
|
||||
return server_proxys[subjob_hostname].get_job_info(subjob_id)
|
||||
|
||||
while True:
|
||||
incomplete_jobs = {}
|
||||
|
||||
for child_key in list(
|
||||
parent_job.children.keys()): # Create a list to avoid dictionary modification during iteration
|
||||
subjob_data = fetch_subjob_info(child_key)
|
||||
|
||||
# Fetch info from server and handle failing case
|
||||
subjob_data = RenderServerProxy(subjob_hostname).get_job_info(subjob_id)
|
||||
if not subjob_data:
|
||||
logger.warning(f"No response from: {subjob_hostname}")
|
||||
# todo: handle timeout / missing server situations
|
||||
subjob_id, subjob_hostname = child_key.split('@')
|
||||
last_connection = datetime.datetime.now() - server_proxys[subjob_hostname].last_contact
|
||||
logger.warning(f"No response from: {subjob_hostname} - Last connection: {last_connection}")
|
||||
|
||||
last_connection_max_time = 12
|
||||
if last_connection.seconds > last_connection_max_time:
|
||||
logger.error(
|
||||
f"{subjob_hostname} has been offline for over {last_connection_max_time} seconds - Assuming render failed")
|
||||
logger.warning(f"Spinning up a new subjob to replace the offlined server")
|
||||
parent_job.children[child_key]['errors'] = ['Renderer went offline']
|
||||
parent_job.children[child_key]['status'] = RenderStatus.ERROR
|
||||
|
||||
cls.handle_subjob_status_change(parent_job_id=parent_job.id,
|
||||
subjob_data=parent_job.children[child_key])
|
||||
continue
|
||||
|
||||
# Update parent job cache but keep the download status
|
||||
download_status = local_job.children[child_key].get('download_status', None)
|
||||
local_job.children[child_key] = subjob_data
|
||||
local_job.children[child_key]['download_status'] = download_status
|
||||
parent_job.children[child_key] = subjob_data
|
||||
|
||||
status = string_to_status(subjob_data.get('status', ''))
|
||||
status_msg = f"Subjob {child_key} | {status} | " \
|
||||
f"{float(subjob_data.get('percent_complete')) * 100.0}%"
|
||||
status_msg = f"Subjob {child_key} | {status} | {float(subjob_data.get('percent_complete', 0)) * 100.0}%"
|
||||
logger.debug(status_msg)
|
||||
|
||||
# Still working in another thread - keep waiting
|
||||
if download_status == 'working':
|
||||
continue
|
||||
if status not in [RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.ERROR]:
|
||||
incomplete_jobs[child_key] = subjob_data
|
||||
|
||||
# Check if job is finished, but has not had files copied yet over yet
|
||||
if download_status is None and subjob_data['file_count'] and status in statuses_to_download:
|
||||
download_result = cls.download_from_subjob(local_job, subjob_id, subjob_hostname)
|
||||
if not download_result:
|
||||
logger.error("Failed to download from subjob")
|
||||
# todo: error handling here
|
||||
|
||||
# Any finished jobs not successfully downloaded at this point are skipped
|
||||
if local_job.children[child_key].get('download_status', None) is None and \
|
||||
status in statuses_to_download:
|
||||
logger.warning(f"Skipping waiting on downloading from subjob: {child_key}")
|
||||
local_job.children[child_key]['download_status'] = 'skipped'
|
||||
|
||||
if subjobs_not_downloaded():
|
||||
logger.debug(f"Waiting on {len(subjobs_not_downloaded())} subjobs on "
|
||||
f"{', '.join(list(subjobs_not_downloaded().keys()))}")
|
||||
time.sleep(5)
|
||||
if incomplete_jobs:
|
||||
logger.debug(f"Waiting on {len(incomplete_jobs)} subjobs on {', '.join(list(incomplete_jobs.keys()))}")
|
||||
else:
|
||||
logger.debug("No more incomplete subjobs")
|
||||
if not cls.completion_hold_enabled:
|
||||
break
|
||||
time.sleep(5)
|
||||
|
||||
@classmethod
|
||||
def split_into_subjobs(cls, worker, job_data, project_path):
|
||||
def split_into_subjobs(cls, parent_worker, job_data, project_path):
|
||||
|
||||
# Check availability
|
||||
available_servers = cls.find_available_servers(worker.renderer)
|
||||
subjob_servers = cls.distribute_server_work(worker.start_frame, worker.end_frame, available_servers)
|
||||
available_servers = cls.find_available_servers(parent_worker.renderer)
|
||||
logger.debug(f"Splitting into subjobs - Available servers: {available_servers}")
|
||||
subjob_frame_ranges = cls.distribute_server_work(parent_worker.start_frame, parent_worker.end_frame, available_servers)
|
||||
local_hostname = socket.gethostname()
|
||||
|
||||
# Prep and submit these sub-jobs
|
||||
logger.info(f"Job {worker.id} split plan: {subjob_servers}")
|
||||
simple_ranges = [f"{x['hostname']}:[{x['frame_range'][0]}-{x['frame_range'][1]}]" for x in subjob_frame_ranges]
|
||||
logger.info(f"Job {parent_worker.id} split plan: {','.join(simple_ranges)}")
|
||||
try:
|
||||
for server_data in subjob_servers:
|
||||
server_hostname = server_data['hostname']
|
||||
|
||||
# setup parent render job first - truncate frames
|
||||
local_range = [x for x in subjob_frame_ranges if x['hostname'] == local_hostname][0]
|
||||
parent_worker.start_frame = max(local_range['frame_range'][0], parent_worker.start_frame)
|
||||
parent_worker.end_frame = min(local_range['frame_range'][-1], parent_worker.end_frame)
|
||||
logger.info(f"Local job now rendering from {parent_worker.start_frame} to {parent_worker.end_frame}")
|
||||
RenderQueue.add_to_render_queue(parent_worker) # add range-adjusted parent to render queue
|
||||
|
||||
# setup remote subjobs
|
||||
submission_results = {}
|
||||
for subjob_server_data in subjob_frame_ranges:
|
||||
server_hostname = subjob_server_data['hostname']
|
||||
if server_hostname != local_hostname:
|
||||
post_results = cls.__create_subjob(job_data, local_hostname, project_path, server_data,
|
||||
server_hostname, worker)
|
||||
post_results = cls.new_create_subjob(parent_worker.id, server_hostname,
|
||||
subjob_server_data['frame_range'][0],
|
||||
subjob_server_data['frame_range'][-1])
|
||||
|
||||
if post_results.ok:
|
||||
server_data['submission_results'] = post_results.json()[0]
|
||||
subjob_server_data['submission_results'] = post_results.json()[0]
|
||||
else:
|
||||
logger.error(f"Failed to create subjob on {server_hostname}")
|
||||
break
|
||||
else:
|
||||
# truncate parent render_job
|
||||
worker.start_frame = max(server_data['frame_range'][0], worker.start_frame)
|
||||
worker.end_frame = min(server_data['frame_range'][-1], worker.end_frame)
|
||||
logger.info(f"Local job now rendering from {worker.start_frame} to {worker.end_frame}")
|
||||
server_data['submission_results'] = worker.json()
|
||||
subjob_server_data['submission_results'] = [True]
|
||||
|
||||
# check that job posts were all successful.
|
||||
if not all(d.get('submission_results') is not None for d in subjob_servers):
|
||||
raise ValueError("Failed to create all subjobs") # look into recalculating job #s and use exising jobs
|
||||
# if not all(d.get('submission_results') is not None for d in subjob_frame_ranges):
|
||||
# # todo: rewrite this code - should not have to have all submissions go through
|
||||
# raise ValueError("Failed to create all subjobs") # look into recalculating job #s and use exising jobs
|
||||
|
||||
# start subjobs
|
||||
logger.debug(f"Starting {len(subjob_servers) - 1} attempted subjobs")
|
||||
for server_data in subjob_servers:
|
||||
if server_data['hostname'] != local_hostname:
|
||||
child_key = f"{server_data['submission_results']['id']}@{server_data['hostname']}"
|
||||
worker.children[child_key] = server_data['submission_results']
|
||||
worker.name = f"{worker.name}[{worker.start_frame}-{worker.end_frame}]"
|
||||
logger.debug(f"Starting {len(subjob_frame_ranges) - 1} attempted subjobs")
|
||||
for subjob_server_data in subjob_frame_ranges:
|
||||
if subjob_server_data['hostname'] != local_hostname:
|
||||
child_key = f"{subjob_server_data['submission_results']['id']}@{subjob_server_data['hostname']}"
|
||||
parent_worker.children[child_key] = subjob_server_data['submission_results']
|
||||
parent_worker.name = f"{parent_worker.name}[{parent_worker.start_frame}-{parent_worker.end_frame}]"
|
||||
|
||||
except Exception as e:
|
||||
# cancel all the subjobs
|
||||
logger.error(f"Failed to split job into subjobs: {e}")
|
||||
logger.debug(f"Cancelling {len(subjob_servers) - 1} attempted subjobs")
|
||||
logger.exception(f"Failed to split job into subjobs: {e}")
|
||||
logger.debug(f"Cancelling {len(subjob_frame_ranges) - 1} attempted subjobs")
|
||||
# [RenderServerProxy(hostname).cancel_job(results['id'], confirm=True) for hostname, results in
|
||||
# submission_results.items()] # todo: fix this
|
||||
|
||||
@staticmethod
|
||||
def __create_subjob(job_data, local_hostname, project_path, server_data, server_hostname, worker):
|
||||
subjob = job_data.copy()
|
||||
subjob['name'] = f"{worker.name}[{server_data['frame_range'][0]}-{server_data['frame_range'][-1]}]"
|
||||
subjob['parent'] = f"{worker.id}@{local_hostname}"
|
||||
subjob['start_frame'] = server_data['frame_range'][0]
|
||||
subjob['end_frame'] = server_data['frame_range'][-1]
|
||||
logger.debug(f"Posting subjob with frames {subjob['start_frame']}-"
|
||||
f"{subjob['end_frame']} to {server_hostname}")
|
||||
post_results = RenderServerProxy(server_hostname).post_job_to_server(
|
||||
file_path=project_path, job_list=[subjob])
|
||||
def new_create_subjob(parent_job_id, remote_hostname, start_frame, end_frame):
|
||||
"""
|
||||
Create and post a subjob to a remote render server.
|
||||
|
||||
Parameters:
|
||||
- parent_job_id (str): ID of the parent job.
|
||||
- remote_hostname (str): Remote server's hostname/address.
|
||||
- start_frame (int): Starting frame of the subjob.
|
||||
- end_frame (int): Ending frame of the subjob.
|
||||
|
||||
Example:
|
||||
new_create_subjob('parent_job_123', 'remote-server.example.com', 1, 100)
|
||||
"""
|
||||
logger.info(f"parentID: {parent_job_id}")
|
||||
local_hostname = socket.gethostname()
|
||||
parent_job = RenderQueue.job_with_id(parent_job_id)
|
||||
subjob_data = {'renderer': parent_job.engine.name(), 'input_path': parent_job.input_path,
|
||||
'args': parent_job.args, 'output_path': parent_job.output_path,
|
||||
'engine_version': parent_job.renderer_version, 'start_frame': start_frame,
|
||||
'end_frame': end_frame, 'parent': f"{parent_job_id}@{local_hostname}"}
|
||||
|
||||
logger.info(f"Creating subjob {os.path.basename(parent_job.input_path)} [{start_frame}-{end_frame}] "
|
||||
f"for {remote_hostname}")
|
||||
post_results = RenderServerProxy(remote_hostname).post_job_to_server(
|
||||
file_path=parent_job.input_path, job_list=[subjob_data])
|
||||
post_results_json = post_results.json()[0]
|
||||
|
||||
parent_job.children[f"{post_results_json['id']}@{remote_hostname}"] = post_results_json
|
||||
return post_results
|
||||
|
||||
@staticmethod
|
||||
@@ -323,17 +418,17 @@ class DistributedJobManager:
|
||||
return server_breakdown
|
||||
|
||||
@staticmethod
|
||||
def find_available_servers(renderer):
|
||||
def find_available_servers(engine_name):
|
||||
"""
|
||||
Scan the Zeroconf network for currently available render servers supporting a specific renderer.
|
||||
Scan the Zeroconf network for currently available render servers supporting a specific engine.
|
||||
|
||||
:param renderer: str, The renderer type to search for
|
||||
:param engine_name: str, The engine type to search for
|
||||
:return: A list of dictionaries with each dict containing hostname and cpu_count of available servers
|
||||
"""
|
||||
available_servers = []
|
||||
for hostname in ZeroconfServer.found_clients():
|
||||
response = RenderServerProxy(hostname).get_status()
|
||||
if response and response.get('renderers', {}).get(renderer, {}).get('is_available', False):
|
||||
available_servers.append({'hostname': hostname, 'cpu_count': int(response['cpu_count'])})
|
||||
response = RenderServerProxy(hostname).is_engine_available(engine_name)
|
||||
if response and response.get('available', False):
|
||||
available_servers.append(response)
|
||||
|
||||
return available_servers
|
||||
|
||||
@@ -1,7 +1,4 @@
|
||||
try:
|
||||
from .base_engine import *
|
||||
except ImportError:
|
||||
from base_engine import *
|
||||
from src.engines.core.base_engine import BaseRenderEngine
|
||||
|
||||
|
||||
class AERender(BaseRenderEngine):
|
||||
@@ -6,8 +6,8 @@ import os
|
||||
import re
|
||||
import time
|
||||
|
||||
from src.workers.base_worker import BaseRenderWorker, timecode_to_frames
|
||||
from src.engines.aerender_engine import AERender
|
||||
from src.engines.core.base_worker import BaseRenderWorker, timecode_to_frames
|
||||
from src.engines.aerender.aerender_engine import AERender
|
||||
|
||||
|
||||
def aerender_path():
|
||||
@@ -1,11 +1,10 @@
|
||||
import logging
|
||||
import os
|
||||
import platform
|
||||
import re
|
||||
|
||||
import requests
|
||||
|
||||
from .downloader_core import download_and_extract_app
|
||||
from src.engines.core.downloader_core import download_and_extract_app
|
||||
from src.utilities.misc_helper import current_system_os, current_system_cpu
|
||||
|
||||
# url = "https://download.blender.org/release/"
|
||||
url = "https://ftp.nluug.nl/pub/graphics/blender/release/" # much faster mirror for testing
|
||||
@@ -19,7 +18,7 @@ class BlenderDownloader:
|
||||
@staticmethod
|
||||
def get_major_versions():
|
||||
try:
|
||||
response = requests.get(url)
|
||||
response = requests.get(url, timeout=5)
|
||||
response.raise_for_status()
|
||||
|
||||
# Use regex to find all the <a> tags and extract the href attribute
|
||||
@@ -36,27 +35,42 @@ class BlenderDownloader:
|
||||
@staticmethod
|
||||
def get_minor_versions(major_version, system_os=None, cpu=None):
|
||||
|
||||
base_url = url + 'Blender' + major_version
|
||||
try:
|
||||
base_url = url + 'Blender' + major_version
|
||||
response = requests.get(base_url, timeout=5)
|
||||
response.raise_for_status()
|
||||
|
||||
response = requests.get(base_url)
|
||||
response.raise_for_status()
|
||||
versions_pattern = r'<a href="(?P<file>[^"]+)">blender-(?P<version>[\d\.]+)-(?P<system_os>\w+)-(?P<cpu>\w+).*</a>'
|
||||
versions_data = [match.groupdict() for match in re.finditer(versions_pattern, response.text)]
|
||||
|
||||
versions_pattern = r'<a href="(?P<file>[^"]+)">blender-(?P<version>[\d\.]+)-(?P<system_os>\w+)-(?P<cpu>\w+).*</a>'
|
||||
versions_data = [match.groupdict() for match in re.finditer(versions_pattern, response.text)]
|
||||
# Filter to just the supported formats
|
||||
versions_data = [item for item in versions_data if any(item["file"].endswith(ext) for ext in supported_formats)]
|
||||
|
||||
# Filter to just the supported formats
|
||||
versions_data = [item for item in versions_data if any(item["file"].endswith(ext) for ext in supported_formats)]
|
||||
|
||||
if system_os:
|
||||
# Filter down OS and CPU
|
||||
system_os = system_os or current_system_os()
|
||||
cpu = cpu or current_system_cpu()
|
||||
versions_data = [x for x in versions_data if x['system_os'] == system_os]
|
||||
if cpu:
|
||||
versions_data = [x for x in versions_data if x['cpu'] == cpu]
|
||||
|
||||
for v in versions_data:
|
||||
v['url'] = os.path.join(base_url, v['file'])
|
||||
for v in versions_data:
|
||||
v['url'] = base_url + '/' + v['file']
|
||||
|
||||
versions_data = sorted(versions_data, key=lambda x: x['version'], reverse=True)
|
||||
return versions_data
|
||||
versions_data = sorted(versions_data, key=lambda x: x['version'], reverse=True)
|
||||
return versions_data
|
||||
except requests.exceptions.HTTPError as e:
|
||||
logger.error(f"Invalid url: {e}")
|
||||
except Exception as e:
|
||||
logger.exception(e)
|
||||
return []
|
||||
|
||||
@classmethod
|
||||
def version_is_available_to_download(cls, version, system_os=None, cpu=None):
|
||||
requested_major_version = '.'.join(version.split('.')[:2])
|
||||
minor_versions = cls.get_minor_versions(requested_major_version, system_os, cpu)
|
||||
for minor in minor_versions:
|
||||
if minor['version'] == version:
|
||||
return minor
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def find_LTS_versions():
|
||||
@@ -71,18 +85,18 @@ class BlenderDownloader:
|
||||
return lts_versions
|
||||
|
||||
@classmethod
|
||||
def find_most_recent_version(cls, system_os, cpu, lts_only=False):
|
||||
def find_most_recent_version(cls, system_os=None, cpu=None, lts_only=False):
|
||||
try:
|
||||
major_version = cls.find_LTS_versions()[0] if lts_only else cls.get_major_versions()[0]
|
||||
most_recent = cls.get_minor_versions(major_version, system_os, cpu)[0]
|
||||
return most_recent
|
||||
most_recent = cls.get_minor_versions(major_version=major_version, system_os=system_os, cpu=cpu)
|
||||
return most_recent[0]
|
||||
except IndexError:
|
||||
logger.error("Cannot find a most recent version")
|
||||
|
||||
@classmethod
|
||||
def download_engine(cls, version, download_location, system_os=None, cpu=None):
|
||||
system_os = system_os or platform.system().lower().replace('darwin', 'macos')
|
||||
cpu = cpu or platform.machine().lower().replace('amd64', 'x64')
|
||||
def download_engine(cls, version, download_location, system_os=None, cpu=None, timeout=120):
|
||||
system_os = system_os or current_system_os()
|
||||
cpu = cpu or current_system_cpu()
|
||||
|
||||
try:
|
||||
logger.info(f"Requesting download of blender-{version}-{system_os}-{cpu}")
|
||||
@@ -90,7 +104,8 @@ class BlenderDownloader:
|
||||
minor_versions = [x for x in cls.get_minor_versions(major_version, system_os, cpu) if x['version'] == version]
|
||||
# we get the URL instead of calculating it ourselves. May change this
|
||||
|
||||
download_and_extract_app(remote_url=minor_versions[0]['url'], download_location=download_location)
|
||||
download_and_extract_app(remote_url=minor_versions[0]['url'], download_location=download_location,
|
||||
timeout=timeout)
|
||||
except IndexError:
|
||||
logger.error("Cannot find requested engine")
|
||||
|
||||
@@ -1,10 +1,8 @@
|
||||
try:
|
||||
from .base_engine import *
|
||||
except ImportError:
|
||||
from base_engine import *
|
||||
import json
|
||||
import re
|
||||
import logging
|
||||
|
||||
from src.engines.core.base_engine import *
|
||||
from src.utilities.misc_helper import system_safe_path
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
@@ -13,6 +11,7 @@ class Blender(BaseRenderEngine):
|
||||
|
||||
install_paths = ['/Applications/Blender.app/Contents/MacOS/Blender']
|
||||
supported_extensions = ['.blend']
|
||||
binary_names = {'linux': 'blender', 'windows': 'blender.exe', 'macos': 'Blender'}
|
||||
|
||||
def version(self):
|
||||
version = None
|
||||
@@ -36,18 +35,17 @@ class Blender(BaseRenderEngine):
|
||||
return subprocess.run([self.renderer_path(), '-b', project_path, '--python-expr', python_expression],
|
||||
capture_output=True, timeout=timeout)
|
||||
except Exception as e:
|
||||
logger.warning(f"Error running python expression in blender: {e}")
|
||||
pass
|
||||
logger.error(f"Error running python expression in blender: {e}")
|
||||
else:
|
||||
raise FileNotFoundError(f'Project file not found: {project_path}')
|
||||
|
||||
def run_python_script(self, project_path, script_path, timeout=None):
|
||||
if os.path.exists(project_path) and os.path.exists(script_path):
|
||||
try:
|
||||
return subprocess.run([self.default_renderer_path(), '-b', project_path, '--python', script_path],
|
||||
return subprocess.run([self.renderer_path(), '-b', project_path, '--python', script_path],
|
||||
capture_output=True, timeout=timeout)
|
||||
except Exception as e:
|
||||
logger.warning(f"Error running python expression in blender: {e}")
|
||||
logger.warning(f"Error running python script in blender: {e}")
|
||||
pass
|
||||
elif not os.path.exists(project_path):
|
||||
raise FileNotFoundError(f'Project file not found: {project_path}')
|
||||
@@ -58,8 +56,8 @@ class Blender(BaseRenderEngine):
|
||||
def get_scene_info(self, project_path, timeout=10):
|
||||
scene_info = {}
|
||||
try:
|
||||
results = self.run_python_script(project_path, os.path.join(os.path.dirname(os.path.realpath(__file__)),
|
||||
'scripts', 'blender', 'get_file_info.py'), timeout=timeout)
|
||||
script_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'scripts', 'get_file_info.py')
|
||||
results = self.run_python_script(project_path, system_safe_path(script_path), timeout=timeout)
|
||||
result_text = results.stdout.decode()
|
||||
for line in result_text.splitlines():
|
||||
if line.startswith('SCENE_DATA:'):
|
||||
@@ -76,8 +74,8 @@ class Blender(BaseRenderEngine):
|
||||
# Credit to L0Lock for pack script - https://blender.stackexchange.com/a/243935
|
||||
try:
|
||||
logger.info(f"Starting to pack Blender file: {project_path}")
|
||||
results = self.run_python_script(project_path, os.path.join(os.path.dirname(os.path.realpath(__file__)),
|
||||
'scripts', 'blender', 'pack_project.py'), timeout=timeout)
|
||||
script_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'scripts', 'pack_project.py')
|
||||
results = self.run_python_script(project_path, system_safe_path(script_path), timeout=timeout)
|
||||
|
||||
result_text = results.stdout.decode()
|
||||
dir_name = os.path.dirname(project_path)
|
||||
@@ -2,18 +2,18 @@
|
||||
import re
|
||||
from collections import Counter
|
||||
|
||||
from src.engines.blender_engine import Blender
|
||||
from src.engines.blender.blender_engine import Blender
|
||||
from src.utilities.ffmpeg_helper import image_sequence_to_video
|
||||
from src.workers.base_worker import *
|
||||
from src.engines.core.base_worker import *
|
||||
|
||||
|
||||
class BlenderRenderWorker(BaseRenderWorker):
|
||||
|
||||
engine = Blender
|
||||
|
||||
def __init__(self, input_path, output_path, args=None, parent=None, name=None):
|
||||
super(BlenderRenderWorker, self).__init__(input_path=input_path, output_path=output_path, args=args,
|
||||
parent=parent, name=name)
|
||||
def __init__(self, input_path, output_path, engine_path, args=None, parent=None, name=None):
|
||||
super(BlenderRenderWorker, self).__init__(input_path=input_path, output_path=output_path,
|
||||
engine_path=engine_path, args=args, parent=parent, name=name)
|
||||
|
||||
# Args
|
||||
self.blender_engine = self.args.get('engine', 'BLENDER_EEVEE').upper()
|
||||
@@ -24,15 +24,14 @@ class BlenderRenderWorker(BaseRenderWorker):
|
||||
self.__frame_percent_complete = 0.0
|
||||
|
||||
# Scene Info
|
||||
self.scene_info = Blender().get_scene_info(input_path)
|
||||
self.scene_info = Blender(engine_path).get_scene_info(input_path)
|
||||
self.start_frame = int(self.scene_info.get('start_frame', 1))
|
||||
self.end_frame = int(self.scene_info.get('end_frame', self.start_frame))
|
||||
self.project_length = (self.end_frame - self.start_frame) + 1
|
||||
self.current_frame = -1
|
||||
|
||||
def generate_worker_subprocess(self):
|
||||
|
||||
cmd = [self.engine.default_renderer_path()]
|
||||
cmd = [self.renderer_path]
|
||||
if self.args.get('background', True): # optionally run render not in background
|
||||
cmd.append('-b')
|
||||
cmd.append(self.input_path)
|
||||
@@ -13,6 +13,8 @@ class BaseRenderEngine(object):
|
||||
|
||||
def __init__(self, custom_path=None):
|
||||
self.custom_renderer_path = custom_path
|
||||
if not self.renderer_path():
|
||||
raise FileNotFoundError(f"Cannot find path to renderer for {self.name()} instance")
|
||||
|
||||
def renderer_path(self):
|
||||
return self.custom_renderer_path or self.default_renderer_path()
|
||||
@@ -24,9 +26,9 @@ class BaseRenderEngine(object):
|
||||
@classmethod
|
||||
def default_renderer_path(cls):
|
||||
path = None
|
||||
try:
|
||||
try: # Linux and macOS
|
||||
path = subprocess.check_output(['which', cls.name()], timeout=SUBPROCESS_TIMEOUT).decode('utf-8').strip()
|
||||
except subprocess.CalledProcessError:
|
||||
except (subprocess.CalledProcessError, FileNotFoundError):
|
||||
for p in cls.install_paths:
|
||||
if os.path.exists(p):
|
||||
path = p
|
||||
@@ -30,10 +30,12 @@ class BaseRenderWorker(Base):
|
||||
end_time = Column(DateTime, nullable=True)
|
||||
renderer = Column(String)
|
||||
renderer_version = Column(String)
|
||||
renderer_path = Column(String)
|
||||
priority = Column(Integer)
|
||||
project_length = Column(Integer)
|
||||
start_frame = Column(Integer)
|
||||
end_frame = Column(Integer, nullable=True)
|
||||
current_frame = Column(Integer)
|
||||
parent = Column(String, nullable=True)
|
||||
children = Column(JSON)
|
||||
name = Column(String)
|
||||
@@ -42,7 +44,7 @@ class BaseRenderWorker(Base):
|
||||
|
||||
engine = None
|
||||
|
||||
def __init__(self, input_path, output_path, priority=2, args=None, ignore_extensions=True, parent=None,
|
||||
def __init__(self, input_path, output_path, engine_path, priority=2, args=None, ignore_extensions=True, parent=None,
|
||||
name=None):
|
||||
|
||||
if not ignore_extensions:
|
||||
@@ -64,7 +66,8 @@ class BaseRenderWorker(Base):
|
||||
self.args = args or {}
|
||||
self.date_created = datetime.now()
|
||||
self.renderer = self.engine.name()
|
||||
self.renderer_version = self.engine().version()
|
||||
self.renderer_path = engine_path
|
||||
self.renderer_version = self.engine(engine_path).version()
|
||||
self.custom_renderer_path = None
|
||||
self.priority = priority
|
||||
self.parent = parent
|
||||
@@ -73,7 +76,7 @@ class BaseRenderWorker(Base):
|
||||
|
||||
# Frame Ranges
|
||||
self.project_length = -1
|
||||
self.current_frame = 0 # should this be a 1 ?
|
||||
self.current_frame = -1 # negative indicates not started
|
||||
self.start_frame = 0 # should this be a 1 ?
|
||||
self.end_frame = None
|
||||
|
||||
@@ -159,7 +162,7 @@ class BaseRenderWorker(Base):
|
||||
self.errors.append(msg)
|
||||
return
|
||||
|
||||
if not self.engine.default_renderer_path() and not self.custom_renderer_path:
|
||||
if not os.path.exists(self.renderer_path):
|
||||
self.status = RenderStatus.ERROR
|
||||
msg = 'Cannot find render engine path for {}'.format(self.engine.name())
|
||||
logger.error(msg)
|
||||
@@ -168,7 +171,7 @@ class BaseRenderWorker(Base):
|
||||
|
||||
self.status = RenderStatus.RUNNING
|
||||
self.start_time = datetime.now()
|
||||
logger.info(f'Starting {self.engine.name()} {self.engine().version()} Render for {self.input_path} | '
|
||||
logger.info(f'Starting {self.engine.name()} {self.renderer_version} Render for {self.input_path} | '
|
||||
f'Frame Count: {self.total_frames}')
|
||||
self.__thread.start()
|
||||
|
||||
@@ -183,7 +186,7 @@ class BaseRenderWorker(Base):
|
||||
|
||||
with open(self.log_path(), "a") as f:
|
||||
|
||||
f.write(f"{self.start_time.isoformat()} - Starting {self.engine.name()} {self.engine().version()} "
|
||||
f.write(f"{self.start_time.isoformat()} - Starting {self.engine.name()} {self.renderer_version} "
|
||||
f"render for {self.input_path}\n\n")
|
||||
f.write(f"Running command: {subprocess_cmds}\n")
|
||||
f.write('=' * 80 + '\n\n')
|
||||
@@ -234,7 +237,7 @@ class BaseRenderWorker(Base):
|
||||
|
||||
if self.children:
|
||||
from src.distributed_job_manager import DistributedJobManager
|
||||
DistributedJobManager.wait_for_subjobs(local_job=self)
|
||||
DistributedJobManager.wait_for_subjobs(parent_job=self)
|
||||
|
||||
# Post Render Work
|
||||
logger.debug("Starting post-processing work")
|
||||
@@ -302,7 +305,6 @@ class BaseRenderWorker(Base):
|
||||
'children': self.children,
|
||||
'date_created': self.date_created,
|
||||
'start_time': self.start_time,
|
||||
'end_time': self.end_time,
|
||||
'status': self.status.value,
|
||||
'file_hash': self.file_hash,
|
||||
'percent_complete': self.percent_complete(),
|
||||
@@ -312,6 +314,7 @@ class BaseRenderWorker(Base):
|
||||
'errors': getattr(self, 'errors', None),
|
||||
'start_frame': self.start_frame,
|
||||
'end_frame': self.end_frame,
|
||||
'current_frame': self.current_frame,
|
||||
'total_frames': self.total_frames,
|
||||
'last_output': getattr(self, 'last_output', None),
|
||||
'log_path': self.log_path()
|
||||
@@ -12,7 +12,7 @@ supported_formats = ['.zip', '.tar.xz', '.dmg']
|
||||
logger = logging.getLogger()
|
||||
|
||||
|
||||
def download_and_extract_app(remote_url, download_location):
|
||||
def download_and_extract_app(remote_url, download_location, timeout=120):
|
||||
|
||||
# Create a temp download directory
|
||||
temp_download_dir = tempfile.mkdtemp()
|
||||
@@ -30,7 +30,7 @@ def download_and_extract_app(remote_url, download_location):
|
||||
if not os.path.exists(temp_downloaded_file_path):
|
||||
# Make a GET request to the URL with stream=True to enable streaming
|
||||
logger.info(f"Downloading {output_dir_name} from {remote_url}")
|
||||
response = requests.get(remote_url, stream=True)
|
||||
response = requests.get(remote_url, stream=True, timeout=timeout)
|
||||
|
||||
# Check if the request was successful
|
||||
if response.status_code == 200:
|
||||
@@ -54,6 +54,7 @@ def download_and_extract_app(remote_url, download_location):
|
||||
logger.info(f"Successfully downloaded {os.path.basename(temp_downloaded_file_path)}")
|
||||
else:
|
||||
logger.error(f"Failed to download the file. Status code: {response.status_code}")
|
||||
return
|
||||
|
||||
os.makedirs(download_location, exist_ok=True)
|
||||
|
||||
61
src/engines/core/worker_factory.py
Normal file
61
src/engines/core/worker_factory.py
Normal file
@@ -0,0 +1,61 @@
|
||||
import logging
|
||||
|
||||
from src.engines.engine_manager import EngineManager
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
|
||||
class RenderWorkerFactory:
|
||||
|
||||
@staticmethod
|
||||
def supported_classes():
|
||||
# to add support for any additional RenderWorker classes, import their classes and add to list here
|
||||
from src.engines.blender.blender_worker import BlenderRenderWorker
|
||||
from src.engines.aerender.aerender_worker import AERenderWorker
|
||||
from src.engines.ffmpeg.ffmpeg_worker import FFMPEGRenderWorker
|
||||
classes = [BlenderRenderWorker, AERenderWorker, FFMPEGRenderWorker]
|
||||
return classes
|
||||
|
||||
@staticmethod
|
||||
def create_worker(renderer, input_path, output_path, engine_version=None, args=None, parent=None, name=None):
|
||||
|
||||
worker_class = RenderWorkerFactory.class_for_name(renderer)
|
||||
|
||||
# check to make sure we have versions installed
|
||||
all_versions = EngineManager.all_versions_for_engine(renderer)
|
||||
if not all_versions:
|
||||
raise FileNotFoundError(f"Cannot find any installed {renderer} engines")
|
||||
|
||||
# Find the path to the requested engine version or use default
|
||||
engine_path = None if engine_version else all_versions[0]['path']
|
||||
if engine_version:
|
||||
for ver in all_versions:
|
||||
if ver['version'] == engine_version:
|
||||
engine_path = ver['path']
|
||||
break
|
||||
|
||||
# Download the required engine if not found locally
|
||||
if not engine_path:
|
||||
download_result = EngineManager.download_engine(renderer, engine_version)
|
||||
if not download_result:
|
||||
raise FileNotFoundError(f"Cannot download requested version: {renderer} {engine_version}")
|
||||
engine_path = download_result['path']
|
||||
logger.info("Engine downloaded. Creating worker.")
|
||||
|
||||
if not engine_path:
|
||||
raise FileNotFoundError(f"Cannot find requested engine version {engine_version}")
|
||||
|
||||
return worker_class(input_path=input_path, output_path=output_path, engine_path=engine_path, args=args,
|
||||
parent=parent, name=name)
|
||||
|
||||
@staticmethod
|
||||
def supported_renderers():
|
||||
return [x.engine.name() for x in RenderWorkerFactory.supported_classes()]
|
||||
|
||||
@staticmethod
|
||||
def class_for_name(name):
|
||||
name = name.lower()
|
||||
for render_class in RenderWorkerFactory.supported_classes():
|
||||
if render_class.engine.name() == name:
|
||||
return render_class
|
||||
raise LookupError(f'Cannot find class for name: {name}')
|
||||
@@ -1,112 +0,0 @@
|
||||
import logging
|
||||
import os
|
||||
import platform
|
||||
import re
|
||||
|
||||
import requests
|
||||
|
||||
from .downloader_core import download_and_extract_app
|
||||
|
||||
logger = logging.getLogger()
|
||||
supported_formats = ['.zip', '.tar.xz', '.dmg']
|
||||
|
||||
|
||||
class FFMPEGDownloader:
|
||||
|
||||
# macOS FFMPEG mirror maintained by Evermeet - https://evermeet.cx/ffmpeg/
|
||||
macos_url = "https://evermeet.cx/pub/ffmpeg/"
|
||||
|
||||
# Linux FFMPEG mirror maintained by John van Sickle - https://johnvansickle.com/ffmpeg/
|
||||
linux_url = "https://johnvansickle.com/ffmpeg/"
|
||||
|
||||
# macOS FFMPEG mirror maintained by GyanD - https://www.gyan.dev/ffmpeg/builds/
|
||||
windows_download_url = "https://github.com/GyanD/codexffmpeg/releases/download/"
|
||||
windows_api_url = "https://api.github.com/repos/GyanD/codexffmpeg/releases"
|
||||
|
||||
@classmethod
|
||||
def get_macos_versions(cls):
|
||||
response = requests.get(cls.macos_url)
|
||||
response.raise_for_status()
|
||||
|
||||
link_pattern = r'>(.*\.zip)[^\.]'
|
||||
link_matches = re.findall(link_pattern, response.text)
|
||||
|
||||
return [link.split('-')[-1].split('.zip')[0] for link in link_matches]
|
||||
|
||||
@classmethod
|
||||
def get_linux_versions(cls):
|
||||
|
||||
# Link 1 / 2 - Current Version
|
||||
response = requests.get(cls.linux_url)
|
||||
response.raise_for_status()
|
||||
current_release = re.findall(r'release: ([\w\.]+)', response.text)[0]
|
||||
|
||||
# Link 2 / 2 - Previous Versions
|
||||
response = requests.get(os.path.join(cls.linux_url, 'old-releases'))
|
||||
response.raise_for_status()
|
||||
releases = list(set(re.findall(r'href="ffmpeg-([\w\.]+)-.*">ffmpeg', response.text)))
|
||||
releases.sort(reverse=True)
|
||||
releases.insert(0, current_release)
|
||||
return releases
|
||||
|
||||
@classmethod
|
||||
def get_windows_versions(cls):
|
||||
response = requests.get(cls.windows_api_url)
|
||||
response.raise_for_status()
|
||||
|
||||
versions = []
|
||||
all_git_releases = response.json()
|
||||
for item in all_git_releases:
|
||||
if re.match(r'^[0-9.]+$', item['tag_name']):
|
||||
versions.append(item['tag_name'])
|
||||
return versions
|
||||
|
||||
@classmethod
|
||||
def find_most_recent_version(cls, system_os, cpu, lts_only=False):
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def download_engine(cls, version, download_location, system_os=None, cpu=None):
|
||||
system_os = system_os or platform.system().lower().replace('darwin', 'macos')
|
||||
cpu = cpu or platform.machine().lower().replace('amd64', 'x64')
|
||||
|
||||
# Verify requested version is available
|
||||
remote_url = None
|
||||
versions_per_os = {'linux': cls.get_linux_versions, 'macos': cls.get_macos_versions, 'windows': cls.get_windows_versions}
|
||||
if not versions_per_os.get(system_os):
|
||||
logger.error(f"Cannot find version list for {system_os}")
|
||||
return
|
||||
if version not in versions_per_os[system_os]():
|
||||
logger.error(f"Cannot find FFMPEG version {version} for {system_os}")
|
||||
|
||||
# Platform specific naming cleanup
|
||||
if system_os == 'macos':
|
||||
remote_url = os.path.join(cls.macos_url, f"ffmpeg-{version}.zip")
|
||||
download_location = os.path.join(download_location, f'ffmpeg-{version}-{system_os}-{cpu}') # override location to match linux
|
||||
elif system_os == 'linux':
|
||||
release_dir = 'releases' if version == cls.get_linux_versions()[0] else 'old-releases'
|
||||
remote_url = os.path.join(cls.linux_url, release_dir, f'ffmpeg-{version}-{cpu}-static.tar.xz')
|
||||
elif system_os == 'windows':
|
||||
remote_url = os.path.join(cls.windows_download_url, version, f'ffmpeg-{version}-full_build.zip')
|
||||
|
||||
# Download and extract
|
||||
try:
|
||||
logger.info(f"Requesting download of ffmpeg-{version}-{system_os}-{cpu}")
|
||||
download_and_extract_app(remote_url=remote_url, download_location=download_location)
|
||||
|
||||
# naming cleanup to match existing naming convention
|
||||
if system_os == 'linux':
|
||||
os.rename(os.path.join(download_location, f'ffmpeg-{version}-{cpu}-static'),
|
||||
os.path.join(download_location, f'ffmpeg-{version}-{system_os}-{cpu}'))
|
||||
elif system_os == 'windows':
|
||||
os.rename(os.path.join(download_location, f'ffmpeg-{version}-full_build'),
|
||||
os.path.join(download_location, f'ffmpeg-{version}-{system_os}-{cpu}'))
|
||||
|
||||
except IndexError:
|
||||
logger.error("Cannot download requested engine")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
# print(FFMPEGDownloader.download_engine('6.0', '/Users/brett/zordon-uploads/engines/'))
|
||||
print(FFMPEGDownloader.download_engine(version='6.0', download_location='/Users/brett/zordon-uploads/engines/'))
|
||||
@@ -1,18 +1,13 @@
|
||||
import os
|
||||
import logging
|
||||
import platform
|
||||
import os
|
||||
import shutil
|
||||
from .downloaders.blender_downloader import BlenderDownloader
|
||||
from .downloaders.ffmpeg_downloader import FFMPEGDownloader
|
||||
import threading
|
||||
|
||||
try:
|
||||
from .blender_engine import Blender
|
||||
except ImportError:
|
||||
from blender_engine import Blender
|
||||
try:
|
||||
from .ffmpeg_engine import FFMPEG
|
||||
except ImportError:
|
||||
from ffmpeg_engine import FFMPEG
|
||||
from src.engines.blender.blender_downloader import BlenderDownloader
|
||||
from src.engines.blender.blender_engine import Blender
|
||||
from src.engines.ffmpeg.ffmpeg_downloader import FFMPEGDownloader
|
||||
from src.engines.ffmpeg.ffmpeg_engine import FFMPEG
|
||||
from src.utilities.misc_helper import system_safe_path, current_system_os, current_system_cpu
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
@@ -20,6 +15,11 @@ logger = logging.getLogger()
|
||||
class EngineManager:
|
||||
|
||||
engines_path = "~/zordon-uploads/engines"
|
||||
downloader_classes = {
|
||||
"blender": BlenderDownloader,
|
||||
"ffmpeg": FFMPEGDownloader,
|
||||
# Add more engine types and corresponding downloader classes as needed
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def supported_engines(cls):
|
||||
@@ -37,14 +37,25 @@ class EngineManager:
|
||||
# Split the input string by dashes to get segments
|
||||
segments = directory.split('-')
|
||||
|
||||
# Define the keys for each word
|
||||
keys = ["engine", "version", "system_os", "cpu"]
|
||||
|
||||
# Create a dictionary with named keys
|
||||
executable_names = {'linux': 'blender', 'windows': 'blender.exe', 'macos': 'Blender.app'}
|
||||
keys = ["engine", "version", "system_os", "cpu"]
|
||||
result_dict = {keys[i]: segments[i] for i in range(min(len(keys), len(segments)))}
|
||||
result_dict['path'] = os.path.join(cls.engines_path, directory, executable_names.get(result_dict['system_os'], 'unknown'))
|
||||
result_dict['type'] = 'managed'
|
||||
|
||||
# Figure out the binary name for the path
|
||||
binary_name = result_dict['engine'].lower()
|
||||
for eng in cls.supported_engines():
|
||||
if eng.name().lower() == result_dict['engine']:
|
||||
binary_name = eng.binary_names.get(result_dict['system_os'], binary_name)
|
||||
|
||||
# Find path to binary
|
||||
path = None
|
||||
for root, _, files in os.walk(system_safe_path(os.path.join(cls.engines_path, directory))):
|
||||
if binary_name in files:
|
||||
path = os.path.join(root, binary_name)
|
||||
break
|
||||
|
||||
result_dict['path'] = path
|
||||
results.append(result_dict)
|
||||
except FileNotFoundError:
|
||||
logger.warning("Cannot find local engines download directory")
|
||||
@@ -53,8 +64,8 @@ class EngineManager:
|
||||
for eng in cls.supported_engines():
|
||||
if eng.default_renderer_path():
|
||||
results.append({'engine': eng.name(), 'version': eng().version(),
|
||||
'system_os': cls.system_os(),
|
||||
'cpu': cls.system_cpu(),
|
||||
'system_os': current_system_os(),
|
||||
'cpu': current_system_cpu(),
|
||||
'path': eng.default_renderer_path(), 'type': 'system'})
|
||||
|
||||
return results
|
||||
@@ -65,8 +76,8 @@ class EngineManager:
|
||||
|
||||
@classmethod
|
||||
def newest_engine_version(cls, engine, system_os=None, cpu=None):
|
||||
system_os = system_os or cls.system_os()
|
||||
cpu = cpu or cls.system_cpu()
|
||||
system_os = system_os or current_system_os()
|
||||
cpu = cpu or current_system_cpu()
|
||||
|
||||
try:
|
||||
filtered = [x for x in cls.all_engines() if x['engine'] == engine and x['system_os'] == system_os and x['cpu'] == cpu]
|
||||
@@ -77,52 +88,56 @@ class EngineManager:
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def has_engine_version(cls, engine, version, system_os=None, cpu=None):
|
||||
system_os = system_os or cls.system_os()
|
||||
cpu = cpu or cls.system_cpu()
|
||||
def is_version_downloaded(cls, engine, version, system_os=None, cpu=None):
|
||||
system_os = system_os or current_system_os()
|
||||
cpu = cpu or current_system_cpu()
|
||||
|
||||
filtered = [x for x in cls.all_engines() if
|
||||
x['engine'] == engine and x['system_os'] == system_os and x['cpu'] == cpu and x['version'] == version]
|
||||
return filtered[0] if filtered else False
|
||||
|
||||
@staticmethod
|
||||
def system_os():
|
||||
return platform.system().lower().replace('darwin', 'macos')
|
||||
@classmethod
|
||||
def version_is_available_to_download(cls, engine, version, system_os=None, cpu=None):
|
||||
try:
|
||||
return cls.downloader_classes[engine].version_is_available_to_download(version=version, system_os=system_os,
|
||||
cpu=cpu)
|
||||
except Exception as e:
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def system_cpu():
|
||||
return platform.machine().lower().replace('amd64', 'x64')
|
||||
@classmethod
|
||||
def find_most_recent_version(cls, engine=None, system_os=None, cpu=None, lts_only=False):
|
||||
try:
|
||||
return cls.downloader_classes[engine].find_most_recent_version(system_os=system_os, cpu=cpu)
|
||||
except Exception as e:
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def download_engine(cls, engine, version, system_os=None, cpu=None):
|
||||
existing_download = cls.has_engine_version(engine, version, system_os, cpu)
|
||||
existing_download = cls.is_version_downloaded(engine, version, system_os, cpu)
|
||||
if existing_download:
|
||||
logger.info(f"Requested download of {engine} {version}, but local copy already exists")
|
||||
return existing_download
|
||||
|
||||
logger.info(f"Requesting download of {engine} {version}")
|
||||
downloader_classes = {
|
||||
"blender": BlenderDownloader,
|
||||
"ffmpeg": FFMPEGDownloader,
|
||||
# Add more engine types and corresponding downloader classes as needed
|
||||
}
|
||||
|
||||
# Check if the provided engine type is valid
|
||||
if engine not in downloader_classes:
|
||||
if engine not in cls.downloader_classes:
|
||||
logger.error("No valid engine found")
|
||||
return
|
||||
|
||||
# Get the appropriate downloader class based on the engine type
|
||||
downloader = downloader_classes[engine]
|
||||
if downloader.download_engine(version, download_location=cls.engines_path, system_os=system_os, cpu=cpu):
|
||||
return cls.has_engine_version(engine, version, system_os, cpu)
|
||||
else:
|
||||
cls.downloader_classes[engine].download_engine(version, download_location=cls.engines_path,
|
||||
system_os=system_os, cpu=cpu, timeout=300)
|
||||
|
||||
# Check that engine was properly downloaded
|
||||
found_engine = cls.is_version_downloaded(engine, version, system_os, cpu)
|
||||
if not found_engine:
|
||||
logger.error(f"Error downloading {engine}")
|
||||
return found_engine
|
||||
|
||||
|
||||
@classmethod
|
||||
def delete_engine_download(cls, engine, version, system_os=None, cpu=None):
|
||||
logger.info(f"Requested deletion of engine: {engine}-{version}")
|
||||
found = cls.has_engine_version(engine, version, system_os, cpu)
|
||||
found = cls.is_version_downloaded(engine, version, system_os, cpu)
|
||||
if found:
|
||||
dir_path = os.path.dirname(found['path'])
|
||||
shutil.rmtree(dir_path, ignore_errors=True)
|
||||
@@ -131,9 +146,32 @@ class EngineManager:
|
||||
else:
|
||||
logger.error(f"Cannot find engine: {engine}-{version}")
|
||||
|
||||
@classmethod
|
||||
def update_all_engines(cls):
|
||||
def engine_update_task(engine, engine_downloader):
|
||||
logger.debug(f"Checking for updates to {engine}")
|
||||
latest_version = engine_downloader.find_most_recent_version()
|
||||
if latest_version:
|
||||
logger.debug(f"Latest version of {engine} available: {latest_version.get('version')}")
|
||||
if not cls.is_version_downloaded(engine, latest_version.get('version')):
|
||||
logger.info(f"Downloading {engine} ({latest_version['version']})")
|
||||
cls.download_engine(engine=engine, version=latest_version['version'])
|
||||
else:
|
||||
logger.warning(f"Unable to get latest version for {engine}")
|
||||
|
||||
logger.info(f"Checking for updates for render engines...")
|
||||
threads = []
|
||||
for engine, engine_downloader in cls.downloader_classes.items():
|
||||
thread = threading.Thread(target=engine_update_task, args=(engine, engine_downloader))
|
||||
threads.append(thread)
|
||||
thread.start()
|
||||
|
||||
for thread in threads: # wait to finish
|
||||
thread.join()
|
||||
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
|
||||
# print(EngineManager.newest_engine_version('blender', 'macos', 'arm64'))
|
||||
EngineManager.delete_engine_download('blender', '3.2.1', 'macos', 'a')
|
||||
|
||||
|
||||
183
src/engines/ffmpeg/ffmpeg_downloader.py
Normal file
183
src/engines/ffmpeg/ffmpeg_downloader.py
Normal file
@@ -0,0 +1,183 @@
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
|
||||
import requests
|
||||
|
||||
from src.engines.core.downloader_core import download_and_extract_app
|
||||
from src.utilities.misc_helper import current_system_cpu, current_system_os
|
||||
|
||||
logger = logging.getLogger()
|
||||
supported_formats = ['.zip', '.tar.xz', '.dmg']
|
||||
|
||||
|
||||
class FFMPEGDownloader:
|
||||
|
||||
# macOS FFMPEG mirror maintained by Evermeet - https://evermeet.cx/ffmpeg/
|
||||
macos_url = "https://evermeet.cx/pub/ffmpeg/"
|
||||
|
||||
# Linux FFMPEG mirror maintained by John van Sickle - https://johnvansickle.com/ffmpeg/
|
||||
linux_url = "https://johnvansickle.com/ffmpeg/"
|
||||
|
||||
# macOS FFMPEG mirror maintained by GyanD - https://www.gyan.dev/ffmpeg/builds/
|
||||
windows_download_url = "https://github.com/GyanD/codexffmpeg/releases/download/"
|
||||
windows_api_url = "https://api.github.com/repos/GyanD/codexffmpeg/releases"
|
||||
|
||||
# used to cache renderer versions in case they need to be accessed frequently
|
||||
version_cache = {}
|
||||
|
||||
@classmethod
|
||||
def __get_macos_versions(cls, use_cache=True):
|
||||
|
||||
# cache the versions locally
|
||||
version_cache = cls.version_cache.get('macos')
|
||||
if version_cache and use_cache:
|
||||
return version_cache
|
||||
|
||||
response = requests.get(cls.macos_url, timeout=5)
|
||||
response.raise_for_status()
|
||||
|
||||
link_pattern = r'>(.*\.zip)[^\.]'
|
||||
link_matches = re.findall(link_pattern, response.text)
|
||||
|
||||
releases = [link.split('-')[-1].split('.zip')[0] for link in link_matches]
|
||||
cls.version_cache['macos'] = releases
|
||||
return releases
|
||||
|
||||
@classmethod
|
||||
def __get_linux_versions(cls, use_cache=True):
|
||||
|
||||
# cache the versions locally
|
||||
version_cache = cls.version_cache.get('linux')
|
||||
if version_cache and use_cache:
|
||||
return version_cache
|
||||
|
||||
# Link 1 / 2 - Current Version
|
||||
response = requests.get(cls.linux_url, timeout=5)
|
||||
response.raise_for_status()
|
||||
current_release = re.findall(r'release: ([\w\.]+)', response.text)[0]
|
||||
|
||||
# Link 2 / 2 - Previous Versions
|
||||
response = requests.get(os.path.join(cls.linux_url, 'old-releases'), timeout=5)
|
||||
response.raise_for_status()
|
||||
releases = list(set(re.findall(r'href="ffmpeg-([\w\.]+)-.*">ffmpeg', response.text)))
|
||||
releases.sort(reverse=True)
|
||||
releases.insert(0, current_release)
|
||||
|
||||
# Add to cache
|
||||
cls.version_cache['linux'] = releases
|
||||
return releases
|
||||
|
||||
@classmethod
|
||||
def __get_windows_versions(cls, use_cache=True):
|
||||
|
||||
version_cache = cls.version_cache.get('windows')
|
||||
if version_cache and use_cache:
|
||||
return version_cache
|
||||
|
||||
response = requests.get(cls.windows_api_url, timeout=5)
|
||||
response.raise_for_status()
|
||||
|
||||
releases = []
|
||||
all_git_releases = response.json()
|
||||
for item in all_git_releases:
|
||||
if re.match(r'^[0-9.]+$', item['tag_name']):
|
||||
releases.append(item['tag_name'])
|
||||
|
||||
cls.version_cache['linux'] = releases
|
||||
return releases
|
||||
|
||||
@classmethod
|
||||
def find_most_recent_version(cls, system_os=None, cpu=None, lts_only=False):
|
||||
try:
|
||||
system_os = system_os or current_system_os()
|
||||
cpu = cpu or current_system_cpu()
|
||||
return cls.all_versions(system_os, cpu)[0]
|
||||
except TypeError:
|
||||
pass
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def all_versions(cls, system_os=None, cpu=None):
|
||||
system_os = system_os or current_system_os()
|
||||
cpu = cpu or current_system_cpu()
|
||||
versions_per_os = {'linux': cls.__get_linux_versions, 'macos': cls.__get_macos_versions,
|
||||
'windows': cls.__get_windows_versions}
|
||||
if not versions_per_os.get(system_os):
|
||||
logger.error(f"Cannot find version list for {system_os}")
|
||||
return
|
||||
|
||||
results = []
|
||||
all_versions = versions_per_os[system_os]()
|
||||
for version in all_versions:
|
||||
remote_url = cls.__get_remote_url_for_version(version=version, system_os=system_os, cpu=cpu)
|
||||
results.append({'cpu': cpu, 'file': os.path.basename(remote_url), 'system_os': system_os, 'url': remote_url,
|
||||
'version': version})
|
||||
return results
|
||||
|
||||
@classmethod
|
||||
def version_is_available_to_download(cls, version, system_os=None, cpu=None):
|
||||
for ver in cls.all_versions(system_os, cpu):
|
||||
if ver['version'] == version:
|
||||
return ver
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def __get_remote_url_for_version(cls, version, system_os, cpu):
|
||||
# Platform specific naming cleanup
|
||||
remote_url = None
|
||||
if system_os == 'macos':
|
||||
remote_url = os.path.join(cls.macos_url, f"ffmpeg-{version}.zip")
|
||||
elif system_os == 'linux':
|
||||
cpu = cpu.replace('x64', 'amd64') # change cpu to match repo naming convention
|
||||
latest_release = (version == cls.__get_linux_versions(use_cache=True)[0])
|
||||
release_dir = 'releases' if latest_release else 'old-releases'
|
||||
release_filename = f'ffmpeg-release-{cpu}-static.tar.xz' if latest_release else \
|
||||
f'ffmpeg-{version}-{cpu}-static.tar.xz'
|
||||
remote_url = os.path.join(cls.linux_url, release_dir, release_filename)
|
||||
elif system_os == 'windows':
|
||||
remote_url = f"{cls.windows_download_url.strip('/')}/{version}/ffmpeg-{version}-full_build.zip"
|
||||
else:
|
||||
logger.error("Unknown system os")
|
||||
return remote_url
|
||||
|
||||
@classmethod
|
||||
def download_engine(cls, version, download_location, system_os=None, cpu=None, timeout=120):
|
||||
system_os = system_os or current_system_os()
|
||||
cpu = cpu or current_system_cpu()
|
||||
|
||||
# Verify requested version is available
|
||||
found_version = [item for item in cls.all_versions(system_os, cpu) if item['version'] == version]
|
||||
if not found_version:
|
||||
logger.error(f"Cannot find FFMPEG version {version} for {system_os} and {cpu}")
|
||||
return
|
||||
|
||||
# Platform specific naming cleanup
|
||||
remote_url = cls.__get_remote_url_for_version(version=version, system_os=system_os, cpu=cpu)
|
||||
if system_os == 'macos': # override location to match linux
|
||||
download_location = os.path.join(download_location, f'ffmpeg-{version}-{system_os}-{cpu}')
|
||||
|
||||
# Download and extract
|
||||
try:
|
||||
logger.info(f"Requesting download of ffmpeg-{version}-{system_os}-{cpu}")
|
||||
download_and_extract_app(remote_url=remote_url, download_location=download_location, timeout=timeout)
|
||||
|
||||
# naming cleanup to match existing naming convention
|
||||
output_path = os.path.join(download_location, f'ffmpeg-{version}-{system_os}-{cpu}')
|
||||
if system_os == 'linux':
|
||||
initial_cpu = cpu.replace('x64', 'amd64') # change cpu to match repo naming convention
|
||||
os.rename(os.path.join(download_location, f'ffmpeg-{version}-{initial_cpu}-static'), output_path)
|
||||
elif system_os == 'windows':
|
||||
os.rename(os.path.join(download_location, f'ffmpeg-{version}-full_build'), output_path)
|
||||
return output_path
|
||||
except (IndexError, FileNotFoundError) as e:
|
||||
logger.error(f"Cannot download requested engine: {e}")
|
||||
except OSError as e:
|
||||
logger.error(f"OS error while processing engine download: {e}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
# print(FFMPEGDownloader.download_engine('6.0', '/Users/brett/zordon-uploads/engines/'))
|
||||
# print(FFMPEGDownloader.find_most_recent_version(system_os='linux'))
|
||||
print(FFMPEGDownloader.download_engine(version='6.0', download_location='/Users/brett/zordon-uploads/engines/', system_os='linux', cpu='x64'))
|
||||
@@ -1,12 +1,12 @@
|
||||
try:
|
||||
from .base_engine import *
|
||||
except ImportError:
|
||||
from base_engine import *
|
||||
import re
|
||||
|
||||
from src.engines.core.base_engine import *
|
||||
|
||||
|
||||
class FFMPEG(BaseRenderEngine):
|
||||
|
||||
binary_names = {'linux': 'ffmpeg', 'windows': 'ffmpeg.exe', 'macos': 'ffmpeg'}
|
||||
|
||||
def version(self):
|
||||
version = None
|
||||
try:
|
||||
@@ -31,11 +31,15 @@ class FFMPEG(BaseRenderEngine):
|
||||
return encoders
|
||||
|
||||
def get_all_formats(self):
|
||||
formats_raw = subprocess.check_output([self.renderer_path(), '-formats'], stderr=subprocess.DEVNULL,
|
||||
timeout=SUBPROCESS_TIMEOUT).decode('utf-8')
|
||||
pattern = '(?P<type>[DE]{1,2})\s+(?P<id>\S{2,})\s+(?P<name>.*)'
|
||||
all_formats = [m.groupdict() for m in re.finditer(pattern, formats_raw)]
|
||||
return all_formats
|
||||
try:
|
||||
formats_raw = subprocess.check_output([self.renderer_path(), '-formats'], stderr=subprocess.DEVNULL,
|
||||
timeout=SUBPROCESS_TIMEOUT).decode('utf-8')
|
||||
pattern = '(?P<type>[DE]{1,2})\s+(?P<id>\S{2,})\s+(?P<name>.*)\r'
|
||||
all_formats = [m.groupdict() for m in re.finditer(pattern, formats_raw)]
|
||||
return all_formats
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting all formats: {e}")
|
||||
return []
|
||||
|
||||
def extension_for_format(self, ffmpeg_format):
|
||||
# Extract the common extension using regex
|
||||
@@ -53,7 +57,7 @@ class FFMPEG(BaseRenderEngine):
|
||||
return [x for x in self.get_all_formats() if 'E' in x['type'].upper()]
|
||||
|
||||
def get_frame_count(self, path_to_file):
|
||||
raw_stdout = subprocess.check_output([self.default_renderer_path(), '-i', path_to_file, '-map', '0:v:0', '-c', 'copy',
|
||||
raw_stdout = subprocess.check_output([self.renderer_path(), '-i', path_to_file, '-map', '0:v:0', '-c', 'copy',
|
||||
'-f', 'null', '-'], stderr=subprocess.STDOUT,
|
||||
timeout=SUBPROCESS_TIMEOUT).decode('utf-8')
|
||||
match = re.findall(r'frame=\s*(\d+)', raw_stdout)
|
||||
@@ -2,8 +2,8 @@
|
||||
import re
|
||||
import subprocess
|
||||
|
||||
from src.workers.base_worker import BaseRenderWorker
|
||||
from src.engines.ffmpeg_engine import FFMPEG
|
||||
from src.engines.core.base_worker import BaseRenderWorker
|
||||
from src.engines.ffmpeg.ffmpeg_engine import FFMPEG
|
||||
|
||||
|
||||
class FFMPEGRenderWorker(BaseRenderWorker):
|
||||
@@ -14,12 +14,11 @@ class FFMPEGRenderWorker(BaseRenderWorker):
|
||||
super(FFMPEGRenderWorker, self).__init__(input_path=input_path, output_path=output_path, args=args,
|
||||
parent=parent, name=name)
|
||||
|
||||
stream_info = subprocess.check_output([self.engine.default_renderer_path(), "-i", # https://stackoverflow.com/a/61604105
|
||||
stream_info = subprocess.check_output([self.renderer_path, "-i", # https://stackoverflow.com/a/61604105
|
||||
input_path, "-map", "0:v:0", "-c", "copy", "-f", "null", "-y",
|
||||
"/dev/null"], stderr=subprocess.STDOUT).decode('utf-8')
|
||||
found_frames = re.findall('frame=\s*(\d+)', stream_info)
|
||||
self.project_length = found_frames[-1] if found_frames else '-1'
|
||||
self.current_frame = -1
|
||||
|
||||
def generate_worker_subprocess(self):
|
||||
|
||||
@@ -5,8 +5,8 @@ from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
from src.utilities.status_utils import RenderStatus
|
||||
from src.workers.worker_factory import RenderWorkerFactory
|
||||
from src.workers.base_worker import Base
|
||||
from src.engines.engine_manager import EngineManager
|
||||
from src.engines.core.base_worker import Base
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
@@ -33,10 +33,6 @@ class RenderQueue:
|
||||
def start_queue(cls):
|
||||
cls.load_state()
|
||||
|
||||
@classmethod
|
||||
def job_status_change(cls, job_id, status):
|
||||
logger.debug(f"Job status changed: {job_id} -> {status}")
|
||||
|
||||
@classmethod
|
||||
def add_to_render_queue(cls, render_job, force_start=False):
|
||||
logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job))
|
||||
@@ -69,10 +65,12 @@ class RenderQueue:
|
||||
|
||||
@classmethod
|
||||
def job_with_id(cls, job_id, none_ok=False):
|
||||
found_job = next((x for x in cls.all_jobs() if x.id == job_id), None)
|
||||
if not found_job and not none_ok:
|
||||
raise JobNotFoundError(job_id)
|
||||
return found_job
|
||||
for job in cls.all_jobs():
|
||||
if job.id == job_id:
|
||||
return job
|
||||
if not none_ok:
|
||||
raise JobNotFoundError(f"Cannot find job with id: {job_id}")
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def clear_history(cls):
|
||||
@@ -84,7 +82,7 @@ class RenderQueue:
|
||||
|
||||
@classmethod
|
||||
def load_state(cls):
|
||||
from src.workers.base_worker import BaseRenderWorker
|
||||
from src.engines.core.base_worker import BaseRenderWorker
|
||||
cls.job_queue = cls.session.query(BaseRenderWorker).all()
|
||||
|
||||
@classmethod
|
||||
@@ -100,7 +98,8 @@ class RenderQueue:
|
||||
|
||||
@classmethod
|
||||
def is_available_for_job(cls, renderer, priority=2):
|
||||
if not RenderWorkerFactory.class_for_name(renderer).engine.default_renderer_path():
|
||||
|
||||
if not EngineManager.all_versions_for_engine(renderer):
|
||||
return False
|
||||
|
||||
instances = cls.renderer_instances()
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
import subprocess
|
||||
from src.engines.ffmpeg_engine import FFMPEG
|
||||
from src.engines.ffmpeg.ffmpeg_engine import FFMPEG
|
||||
|
||||
|
||||
def image_sequence_to_video(source_glob_pattern, output_path, framerate=24, encoder="prores_ks", profile=4,
|
||||
start_frame=1):
|
||||
subprocess.run([FFMPEG.default_renderer_path(), "-framerate", str(framerate), "-start_number", str(start_frame), "-i",
|
||||
f"{source_glob_pattern}", "-c:v", encoder, "-profile:v", str(profile), '-pix_fmt', 'yuva444p10le',
|
||||
output_path], check=True)
|
||||
output_path], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
|
||||
|
||||
|
||||
def save_first_frame(source_path, dest_path, max_width=1280):
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import logging
|
||||
import os
|
||||
import platform
|
||||
import subprocess
|
||||
from datetime import datetime
|
||||
|
||||
@@ -103,3 +104,22 @@ def get_file_size_human(file_path):
|
||||
else:
|
||||
return f"{size_in_bytes / 1024 ** 4:.2f} TB"
|
||||
|
||||
|
||||
# Convert path to the appropriate format for the current platform
|
||||
def system_safe_path(path):
|
||||
if platform.system().lower() == "windows":
|
||||
return os.path.normpath(path)
|
||||
return path.replace("\\", "/")
|
||||
|
||||
|
||||
def current_system_os():
|
||||
return platform.system().lower().replace('darwin', 'macos')
|
||||
|
||||
|
||||
def current_system_os_version():
|
||||
return platform.mac_ver()[0] if current_system_os() == 'macos' else platform.release().lower()
|
||||
|
||||
|
||||
def current_system_cpu():
|
||||
# convert all x86 64 to "x64"
|
||||
return platform.machine().lower().replace('amd64', 'x64').replace('x86_64', 'x64')
|
||||
|
||||
@@ -1,27 +0,0 @@
|
||||
class RenderWorkerFactory:
|
||||
|
||||
@staticmethod
|
||||
def supported_classes():
|
||||
# to add support for any additional RenderWorker classes, import their classes and add to list here
|
||||
from src.workers.blender_worker import BlenderRenderWorker
|
||||
from src.workers.aerender_worker import AERenderWorker
|
||||
from src.workers.ffmpeg_worker import FFMPEGRenderWorker
|
||||
classes = [BlenderRenderWorker, AERenderWorker, FFMPEGRenderWorker]
|
||||
return classes
|
||||
|
||||
@staticmethod
|
||||
def create_worker(renderer, input_path, output_path, args=None, parent=None, name=None):
|
||||
worker_class = RenderWorkerFactory.class_for_name(renderer)
|
||||
return worker_class(input_path=input_path, output_path=output_path, args=args, parent=parent, name=name)
|
||||
|
||||
@staticmethod
|
||||
def supported_renderers():
|
||||
return [x.engine.name() for x in RenderWorkerFactory.supported_classes()]
|
||||
|
||||
@staticmethod
|
||||
def class_for_name(name):
|
||||
name = name.lower()
|
||||
for render_class in RenderWorkerFactory.supported_classes():
|
||||
if render_class.engine.name() == name:
|
||||
return render_class
|
||||
raise LookupError(f'Cannot find class for name: {name}')
|
||||
@@ -1,5 +1,5 @@
|
||||
#!/usr/bin/env python3
|
||||
from src.api_server import start_server
|
||||
from src.api.api_server import start_server
|
||||
|
||||
if __name__ == '__main__':
|
||||
start_server()
|
||||
|
||||
Reference in New Issue
Block a user