Job submission code and API cleanup (#127)

* Refactor add jobs and make add_job api only be one job (instead of a list)

* Renamed to JobImportHandler and misc cleanup

* Dont bury exceptions in server proxy post_job

* Update code to create child jobs in a cleaner manner
This commit is contained in:
2025-12-31 23:14:28 -06:00
committed by GitHub
parent e335328530
commit d8af7c878e
10 changed files with 267 additions and 357 deletions

View File

@@ -5,10 +5,9 @@ import logging
import os import os
import socket import socket
import sys import sys
import threading
import time import time
from server import start_server from server import ZordonServer
from src.api.serverproxy_manager import ServerProxyManager from src.api.serverproxy_manager import ServerProxyManager
logger = logging.getLogger() logger = logging.getLogger()
@@ -84,23 +83,30 @@ def main():
found_proxy = ServerProxyManager.get_proxy_for_hostname(local_hostname) found_proxy = ServerProxyManager.get_proxy_for_hostname(local_hostname)
is_connected = found_proxy.check_connection() is_connected = found_proxy.check_connection()
adhoc_server = None
if not is_connected: if not is_connected:
local_server_thread = threading.Thread(target=start_server, args=[True], daemon=True) adhoc_server = ZordonServer()
local_server_thread.start() adhoc_server.start_server()
found_proxy = ServerProxyManager.get_proxy_for_hostname(adhoc_server.server_hostname)
while not is_connected: while not is_connected:
# todo: add timeout # todo: add timeout
# is_connected = found_proxy.check_connection() is_connected = found_proxy.check_connection()
time.sleep(1) time.sleep(1)
new_job = {"name": job_name, "engine": args.engine} new_job = {"name": job_name, "engine_name": args.engine}
response = found_proxy.post_job_to_server(file_path, [new_job]) try:
response = found_proxy.post_job_to_server(file_path, new_job)
except Exception as e:
print(f"Error creating job: {e}")
exit(1)
if response and response.ok: if response and response.ok:
print(f"Uploaded to {found_proxy.hostname} successfully!") print(f"Uploaded to {found_proxy.hostname} successfully!")
running_job_data = response.json()[0] running_job_data = response.json()
job_id = running_job_data.get('id') job_id = running_job_data.get('id')
print(f"Job {job_id} Summary:") print(f"Job {job_id} Summary:")
print(f" Status : {running_job_data.get('status')}") print(f" Status : {running_job_data.get('status')}")
print(f" Engine : {running_job_data.get('engine')}-{running_job_data.get('engine_version')}") print(f" Engine : {running_job_data.get('engine_name')}-{running_job_data.get('engine_version')}")
print("\nWaiting for render to complete...") print("\nWaiting for render to complete...")
percent_complete = 0.0 percent_complete = 0.0
@@ -114,6 +120,11 @@ def main():
print(f"Percent Complete: {percent_complete:.2%}") print(f"Percent Complete: {percent_complete:.2%}")
sys.stdout.flush() sys.stdout.flush()
print("Finished rendering successfully!") print("Finished rendering successfully!")
else:
print(f"Failed to upload job. {response.text} !")
if adhoc_server:
adhoc_server.stop_server()
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -1,120 +0,0 @@
#!/usr/bin/env python3
import argparse
import logging
import os
import socket
import sys
import threading
import time
from server import start_server
from src.api.serverproxy_manager import ServerProxyManager
logger = logging.getLogger()
def main():
parser = argparse.ArgumentParser(
description="Zordon CLI tool for preparing/submitting a render job",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
# Required arguments
parser.add_argument("scene_file", help="Path to the scene file (e.g., .blend, .max, .mp4)")
parser.add_argument("engine", help="Desired render engine", choices=['blender', 'ffmpeg'])
# Frame range
parser.add_argument("--start", type=int, default=1, help="Start frame")
parser.add_argument("--end", type=int, default=1, help="End frame")
# Job metadata
parser.add_argument("--name", default=None, help="Job name")
# Output
parser.add_argument("--output", default="", help="Output path/pattern (e.g., /renders/frame_####.exr)")
# Target OS and Engine Version
parser.add_argument(
"--os",
choices=["any", "windows", "linux", "macos"],
default="any",
help="Target operating system for render workers"
)
parser.add_argument(
"--engine-version",
default="latest",
help="Required renderer/engine version number (e.g., '4.2', '5.0')"
)
# Optional flags
parser.add_argument("--dry-run", action="store_true", help="Print job details without submitting")
args = parser.parse_args()
# Basic validation
if not os.path.exists(args.scene_file):
print(f"Error: Scene file '{args.scene_file}' not found!", file=sys.stderr)
sys.exit(1)
if args.start > args.end:
print("Error: Start frame cannot be greater than end frame!", file=sys.stderr)
sys.exit(1)
# Calculate total frames
total_frames = len(range(args.start, args.end + 1))
job_name = args.name or os.path.basename(args.scene_file)
file_path = os.path.abspath(args.scene_file)
# Print job summary
print("Render Job Summary:")
print(f" Job Name : {job_name}")
print(f" Scene File : {file_path}")
print(f" Engine : {args.engine}")
print(f" Frames : {args.start}-{args.end}{total_frames} frames")
print(f" Output Path : {args.output or '(default from scene)'}")
print(f" Target OS : {args.os}")
print(f" Engine Version : {args.engine_version}")
if args.dry_run:
print("\nDry run complete (no submission performed).")
return
local_hostname = socket.gethostname()
local_hostname = local_hostname + (".local" if not local_hostname.endswith(".local") else "")
found_proxy = ServerProxyManager.get_proxy_for_hostname(local_hostname)
is_connected = found_proxy.check_connection()
if not is_connected:
local_server_thread = threading.Thread(target=start_server, args=[True], daemon=True)
local_server_thread.start()
while not is_connected:
# todo: add timeout
# is_connected = found_proxy.check_connection()
time.sleep(1)
new_job = {"name": job_name, "renderer": args.engine}
response = found_proxy.post_job_to_server(file_path, [new_job])
if response and response.ok:
print(f"Uploaded to {found_proxy.hostname} successfully!")
running_job_data = response.json()[0]
job_id = running_job_data.get('id')
print(f"Job {job_id} Summary:")
print(f" Status : {running_job_data.get('status')}")
print(f" Engine : {running_job_data.get('renderer')}-{running_job_data.get('renderer_version')}")
print("\nWaiting for render to complete...")
percent_complete = 0.0
while percent_complete < 1.0:
# add checks for errors
time.sleep(1)
running_job_data = found_proxy.get_job_info(job_id)
percent_complete = running_job_data['percent_complete']
sys.stdout.write("\x1b[1A") # Move up 1
sys.stdout.write("\x1b[0J") # Clear from cursor to end of screen (optional)
print(f"Percent Complete: {percent_complete:.2%}")
sys.stdout.flush()
print("Finished rendering successfully!")
if __name__ == "__main__":
main()

View File

@@ -2,10 +2,8 @@ import logging
import multiprocessing import multiprocessing
import os import os
import socket import socket
import sys
import threading import threading
import cpuinfo
import psutil import psutil
from src.api.api_server import API_VERSION from src.api.api_server import API_VERSION
@@ -19,7 +17,7 @@ from src.utilities.config import Config
from src.utilities.misc_helper import (get_gpu_info, system_safe_path, current_system_cpu, current_system_os, from src.utilities.misc_helper import (get_gpu_info, system_safe_path, current_system_cpu, current_system_os,
current_system_os_version, current_system_cpu_brand, check_for_updates) current_system_os_version, current_system_cpu_brand, check_for_updates)
from src.utilities.zeroconf_server import ZeroconfServer from src.utilities.zeroconf_server import ZeroconfServer
from src.version import APP_NAME, APP_VERSION, APP_REPO_NAME, APP_REPO_OWNER from src.version import APP_NAME, APP_VERSION
logger = logging.getLogger() logger = logging.getLogger()
@@ -45,12 +43,8 @@ class ZordonServer:
PreviewManager.storage_path = system_safe_path( PreviewManager.storage_path = system_safe_path(
os.path.join(os.path.expanduser(Config.upload_folder), 'previews')) os.path.join(os.path.expanduser(Config.upload_folder), 'previews'))
# Debug info
logger.debug(f"Upload directory: {os.path.expanduser(Config.upload_folder)}")
logger.debug(f"Thumbs directory: {PreviewManager.storage_path}")
logger.debug(f"Engines directory: {EngineManager.engines_path}")
self.api_server = None self.api_server = None
self.server_hostname = None
def start_server(self): def start_server(self):
@@ -77,22 +71,25 @@ class ZordonServer:
raise ProcessLookupError(err_msg) raise ProcessLookupError(err_msg)
# main start # main start
logger.info(f"Starting {APP_NAME} Render Server") logger.info(f"Starting {APP_NAME} Render Server ({APP_VERSION})")
logger.debug(f"Upload directory: {os.path.expanduser(Config.upload_folder)}")
logger.debug(f"Thumbs directory: {PreviewManager.storage_path}")
logger.debug(f"Engines directory: {EngineManager.engines_path}")
# Set up the RenderQueue object # Set up the RenderQueue object
RenderQueue.load_state(database_directory=system_safe_path(os.path.expanduser(Config.upload_folder))) RenderQueue.load_state(database_directory=system_safe_path(os.path.expanduser(Config.upload_folder)))
ServerProxyManager.subscribe_to_listener() ServerProxyManager.subscribe_to_listener()
DistributedJobManager.subscribe_to_listener() DistributedJobManager.subscribe_to_listener()
# get hostname # get hostname
local_hostname = socket.gethostname() self.server_hostname = socket.gethostname()
# configure and start API server # configure and start API server
self.api_server = threading.Thread(target=start_api_server, args=(local_hostname,)) self.api_server = threading.Thread(target=start_api_server, args=(self.server_hostname,))
self.api_server.daemon = True self.api_server.daemon = True
self.api_server.start() self.api_server.start()
# start zeroconf server # start zeroconf server
ZeroconfServer.configure(f"_{APP_NAME.lower()}._tcp.local.", local_hostname, Config.port_number) ZeroconfServer.configure(f"_{APP_NAME.lower()}._tcp.local.", self.server_hostname, Config.port_number)
ZeroconfServer.properties = {'system_cpu': current_system_cpu(), ZeroconfServer.properties = {'system_cpu': current_system_cpu(),
'system_cpu_brand': current_system_cpu_brand(), 'system_cpu_brand': current_system_cpu_brand(),
'system_cpu_cores': multiprocessing.cpu_count(), 'system_cpu_cores': multiprocessing.cpu_count(),
@@ -102,7 +99,7 @@ class ZordonServer:
'gpu_info': get_gpu_info(), 'gpu_info': get_gpu_info(),
'api_version': API_VERSION} 'api_version': API_VERSION}
ZeroconfServer.start() ZeroconfServer.start()
logger.info(f"{APP_NAME} Render Server started - Hostname: {local_hostname}") logger.info(f"{APP_NAME} Render Server started - Hostname: {self.server_hostname}")
RenderQueue.start() # Start evaluating the render queue RenderQueue.start() # Start evaluating the render queue
def is_running(self): def is_running(self):
@@ -125,6 +122,6 @@ if __name__ == '__main__':
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
except Exception as e: except Exception as e:
logger.error(f"Unhandled exception: {e}") logger.fatal(f"Unhandled exception: {e}")
finally: finally:
server.stop_server() server.stop_server()

View File

@@ -1,150 +0,0 @@
#!/usr/bin/env python3
import logging
import os
import shutil
import tempfile
import zipfile
from datetime import datetime
import requests
from tqdm import tqdm
from werkzeug.utils import secure_filename
logger = logging.getLogger()
def handle_uploaded_project_files(request, jobs_list, upload_directory):
"""
Handles the uploaded project files.
This method takes a request with a file, a list of jobs, and an upload directory. It checks if the file was uploaded
directly, if it needs to be downloaded from a URL, or if it's already present on the local file system. It then
moves the file to the appropriate directory and returns the local path to the file and its name.
Args:
request (Request): The request object containing the file.
jobs_list (list): A list of jobs. The first job in the list is used to get the file's URL and local path.
upload_directory (str): The directory where the file should be uploaded.
Raises:
ValueError: If no valid project paths are found.
Returns:
tuple: A tuple containing the local path to the loaded project file and its name.
"""
# Initialize default values
loaded_project_local_path = None
uploaded_project = request.files.get('file', None)
project_url = jobs_list[0].get('url', None)
local_path = jobs_list[0].get('local_path', None)
engine_name = jobs_list[0]['engine_name']
downloaded_file_url = None
if uploaded_project and uploaded_project.filename:
referred_name = os.path.basename(uploaded_project.filename)
elif project_url:
referred_name, downloaded_file_url = download_project_from_url(project_url)
if not referred_name:
raise ValueError(f"Error downloading file from URL: {project_url}")
elif local_path and os.path.exists(local_path):
referred_name = os.path.basename(local_path)
else:
raise ValueError("Cannot find any valid project paths")
# Prepare the local filepath
cleaned_path_name = jobs_list[0].get('name', os.path.splitext(referred_name)[0]).replace(' ', '-')
job_dir = os.path.join(upload_directory, '-'.join(
[datetime.now().strftime("%Y.%m.%d_%H.%M.%S"), engine_name, cleaned_path_name]))
os.makedirs(job_dir, exist_ok=True)
project_source_dir = os.path.join(job_dir, 'source')
os.makedirs(project_source_dir, exist_ok=True)
# Move projects to their work directories
if uploaded_project and uploaded_project.filename:
loaded_project_local_path = os.path.join(project_source_dir, secure_filename(uploaded_project.filename))
uploaded_project.save(loaded_project_local_path)
logger.info(f"Transfer complete for {loaded_project_local_path.split(upload_directory)[-1]}")
elif project_url:
loaded_project_local_path = os.path.join(project_source_dir, referred_name)
shutil.move(downloaded_file_url, loaded_project_local_path)
logger.info(f"Download complete for {loaded_project_local_path.split(upload_directory)[-1]}")
elif local_path:
loaded_project_local_path = os.path.join(project_source_dir, referred_name)
shutil.copy(local_path, loaded_project_local_path)
logger.info(f"Import complete for {loaded_project_local_path.split(upload_directory)[-1]}")
return loaded_project_local_path, referred_name
def download_project_from_url(project_url):
# This nested function is to handle downloading from a URL
logger.info(f"Downloading project from url: {project_url}")
referred_name = os.path.basename(project_url)
try:
response = requests.get(project_url, stream=True)
if response.status_code == 200:
# Get the total file size from the "Content-Length" header
file_size = int(response.headers.get("Content-Length", 0))
# Create a progress bar using tqdm
progress_bar = tqdm(total=file_size, unit="B", unit_scale=True)
# Open a file for writing in binary mode
downloaded_file_url = os.path.join(tempfile.gettempdir(), referred_name)
with open(downloaded_file_url, "wb") as file:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
# Write the chunk to the file
file.write(chunk)
# Update the progress bar
progress_bar.update(len(chunk))
# Close the progress bar
progress_bar.close()
return referred_name, downloaded_file_url
except Exception as e:
logger.error(f"Error downloading file: {e}")
return None, None
def process_zipped_project(zip_path):
"""
Processes a zipped project.
This method takes a path to a zip file, extracts its contents, and returns the path to the extracted project file.
If the zip file contains more than one project file or none, an error is raised.
Args:
zip_path (str): The path to the zip file.
Raises:
ValueError: If there's more than 1 project file or none in the zip file.
Returns:
str: The path to the main project file.
"""
work_path = os.path.dirname(zip_path)
try:
with zipfile.ZipFile(zip_path, 'r') as myzip:
myzip.extractall(work_path)
project_files = [x for x in os.listdir(work_path) if os.path.isfile(os.path.join(work_path, x))]
project_files = [x for x in project_files if '.zip' not in x]
logger.debug(f"Zip files: {project_files}")
# supported_exts = RenderWorkerFactory.class_for_name(engine).engine.supported_extensions
# if supported_exts:
# project_files = [file for file in project_files if any(file.endswith(ext) for ext in supported_exts)]
# If there's more than 1 project file or none, raise an error
if len(project_files) != 1:
raise ValueError(f'Cannot find a valid project file in {os.path.basename(zip_path)}')
extracted_project_path = os.path.join(work_path, project_files[0])
logger.info(f"Extracted zip file to {extracted_project_path}")
except (zipfile.BadZipFile, zipfile.LargeZipFile) as e:
logger.error(f"Error processing zip file: {e}")
raise ValueError(f"Error processing zip file: {e}")
return extracted_project_path

View File

@@ -17,7 +17,7 @@ import yaml
from flask import Flask, request, send_file, after_this_request, Response, redirect, url_for from flask import Flask, request, send_file, after_this_request, Response, redirect, url_for
from sqlalchemy.orm.exc import DetachedInstanceError from sqlalchemy.orm.exc import DetachedInstanceError
from src.api.add_job_helpers import handle_uploaded_project_files, process_zipped_project from src.api.job_import_handler import JobImportHandler
from src.api.preview_manager import PreviewManager from src.api.preview_manager import PreviewManager
from src.distributed_job_manager import DistributedJobManager from src.distributed_job_manager import DistributedJobManager
from src.engines.engine_manager import EngineManager from src.engines.engine_manager import EngineManager
@@ -32,7 +32,7 @@ logger = logging.getLogger()
server = Flask(__name__) server = Flask(__name__)
ssl._create_default_https_context = ssl._create_unverified_context # disable SSL for downloads ssl._create_default_https_context = ssl._create_unverified_context # disable SSL for downloads
API_VERSION = "1" API_VERSION = "0.1"
def start_api_server(hostname=None): def start_api_server(hostname=None):
@@ -252,32 +252,66 @@ def status():
@server.post('/api/add_job') @server.post('/api/add_job')
def add_job_handler(): def add_job_handler():
# Process request data """
POST /api/add_job
Add a render job to the queue.
**Request Formats**
- JSON body:
{
"name": "example.blend",
"engine": "blender",
"frame_start": 1,
"frame_end": 100,
"render_settings": {...}
"child_jobs"; [...]
}
**Responses**
200 Success
400 Invalid or missing input
500 Internal server error while parsing or creating jobs
"""
try: try:
if request.is_json: if request.is_json:
jobs_list = [request.json] if not isinstance(request.json, list) else request.json new_job_data = request.get_json()
elif request.form.get('json', None): elif request.form.get('json', None):
jobs_list = json.loads(request.form['json']) new_job_data = json.loads(request.form['json'])
else: else:
return "Invalid data", 400 return "Cannot find valid job data", 400
except Exception as e: except Exception as e:
err_msg = f"Error processing job data: {e}" err_msg = f"Error processing job data: {e}"
logger.error(err_msg) logger.error(err_msg)
return err_msg, 500 return err_msg, 500
# Validate Job Data - check for required values and download or unzip project files
try: try:
loaded_project_local_path, referred_name = handle_uploaded_project_files(request, jobs_list, processed_job_data = JobImportHandler.validate_job_data(new_job_data, server.config['UPLOAD_FOLDER'],
server.config['UPLOAD_FOLDER']) uploaded_file=request.files.get('file'))
if loaded_project_local_path.lower().endswith('.zip'): except (KeyError, FileNotFoundError) as e:
loaded_project_local_path = process_zipped_project(loaded_project_local_path) err_msg = f"Error processing job data: {e}"
return err_msg, 400
results = []
for new_job_data in jobs_list:
new_job = DistributedJobManager.create_render_job(new_job_data, loaded_project_local_path)
results.append(new_job.json())
return results, 200
except Exception as e: except Exception as e:
logger.exception(f"Error adding job: {e}") err_msg = f"Unknown error processing data: {e}"
return err_msg, 500
try:
loaded_project_local_path = processed_job_data['__loaded_project_local_path']
created_jobs = []
if processed_job_data.get("child_jobs"):
for child_job_diffs in processed_job_data["child_jobs"]:
processed_child_job_data = processed_job_data.copy()
processed_child_job_data.pop("child_jobs")
processed_child_job_data.update(child_job_diffs)
child_job = DistributedJobManager.create_render_job(processed_child_job_data, loaded_project_local_path)
created_jobs.append(child_job)
else:
new_job = DistributedJobManager.create_render_job(processed_job_data, loaded_project_local_path)
created_jobs.append(new_job)
return [x.json() for x in created_jobs]
except Exception as e:
logger.exception(f"Error creating render job: {e}")
return 'unknown error', 500 return 'unknown error', 500

View File

@@ -0,0 +1,145 @@
#!/usr/bin/env python3
import logging
import os
import shutil
import tempfile
import zipfile
from datetime import datetime
import requests
from tqdm import tqdm
from werkzeug.utils import secure_filename
logger = logging.getLogger()
class JobImportHandler:
@classmethod
def validate_job_data(cls, new_job_data, upload_directory, uploaded_file=None):
loaded_project_local_path = None
# check for required keys
job_name = new_job_data.get('name')
engine_name = new_job_data.get('engine_name')
if not job_name:
raise KeyError("Missing job name")
elif not engine_name:
raise KeyError("Missing engine name")
project_url = new_job_data.get('url', None)
local_path = new_job_data.get('local_path', None)
downloaded_file_url = None
if uploaded_file and uploaded_file.filename:
referred_name = os.path.basename(uploaded_file.filename)
elif project_url:
referred_name, downloaded_file_url = cls.download_project_from_url(project_url)
if not referred_name:
raise FileNotFoundError(f"Error downloading file from URL: {project_url}")
elif local_path and os.path.exists(local_path):
referred_name = os.path.basename(local_path)
else:
raise FileNotFoundError("Cannot find any valid project paths")
# Prepare the local filepath
cleaned_path_name = os.path.splitext(referred_name)[0].replace(' ', '-')
job_dir = os.path.join(upload_directory, '-'.join(
[datetime.now().strftime("%Y.%m.%d_%H.%M.%S"), engine_name, cleaned_path_name]))
os.makedirs(job_dir, exist_ok=True)
project_source_dir = os.path.join(job_dir, 'source')
os.makedirs(project_source_dir, exist_ok=True)
# Move projects to their work directories
if uploaded_file and uploaded_file.filename:
loaded_project_local_path = os.path.join(project_source_dir, secure_filename(uploaded_file.filename))
uploaded_file.save(loaded_project_local_path)
logger.info(f"Transfer complete for {loaded_project_local_path.split(upload_directory)[-1]}")
elif project_url:
loaded_project_local_path = os.path.join(project_source_dir, referred_name)
shutil.move(downloaded_file_url, loaded_project_local_path)
logger.info(f"Download complete for {loaded_project_local_path.split(upload_directory)[-1]}")
elif local_path:
loaded_project_local_path = os.path.join(project_source_dir, referred_name)
shutil.copy(local_path, loaded_project_local_path)
logger.info(f"Import complete for {loaded_project_local_path.split(upload_directory)[-1]}")
if loaded_project_local_path.lower().endswith('.zip'):
loaded_project_local_path = cls.process_zipped_project(loaded_project_local_path)
new_job_data["__loaded_project_local_path"] = loaded_project_local_path
return new_job_data
@staticmethod
def download_project_from_url(project_url):
# This nested function is to handle downloading from a URL
logger.info(f"Downloading project from url: {project_url}")
referred_name = os.path.basename(project_url)
try:
response = requests.get(project_url, stream=True)
if response.status_code == 200:
# Get the total file size from the "Content-Length" header
file_size = int(response.headers.get("Content-Length", 0))
# Create a progress bar using tqdm
progress_bar = tqdm(total=file_size, unit="B", unit_scale=True)
# Open a file for writing in binary mode
downloaded_file_url = os.path.join(tempfile.gettempdir(), referred_name)
with open(downloaded_file_url, "wb") as file:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
# Write the chunk to the file
file.write(chunk)
# Update the progress bar
progress_bar.update(len(chunk))
# Close the progress bar
progress_bar.close()
return referred_name, downloaded_file_url
except Exception as e:
logger.error(f"Error downloading file: {e}")
return None, None
@staticmethod
def process_zipped_project(zip_path):
"""
Processes a zipped project.
This method takes a path to a zip file, extracts its contents, and returns the path to the extracted project file.
If the zip file contains more than one project file or none, an error is raised.
Args:
zip_path (str): The path to the zip file.
Raises:
ValueError: If there's more than 1 project file or none in the zip file.
Returns:
str: The path to the main project file.
"""
work_path = os.path.dirname(zip_path)
try:
with zipfile.ZipFile(zip_path, 'r') as myzip:
myzip.extractall(work_path)
project_files = [x for x in os.listdir(work_path) if os.path.isfile(os.path.join(work_path, x))]
project_files = [x for x in project_files if '.zip' not in x]
logger.debug(f"Zip files: {project_files}")
# supported_exts = RenderWorkerFactory.class_for_name(engine).engine.supported_extensions
# if supported_exts:
# project_files = [file for file in project_files if any(file.endswith(ext) for ext in supported_exts)]
# If there's more than 1 project file or none, raise an error
if len(project_files) != 1:
raise ValueError(f'Cannot find a valid project file in {os.path.basename(zip_path)}')
extracted_project_path = os.path.join(work_path, project_files[0])
logger.info(f"Extracted zip file to {extracted_project_path}")
except (zipfile.BadZipFile, zipfile.LargeZipFile) as e:
logger.error(f"Error processing zip file: {e}")
raise ValueError(f"Error processing zip file: {e}")
return extracted_project_path

View File

@@ -184,51 +184,44 @@ class RenderServerProxy:
# Job Lifecycle: # Job Lifecycle:
# -------------------------------------------- # --------------------------------------------
def post_job_to_server(self, file_path, job_list, callback=None): def post_job_to_server(self, file_path, job_data, callback=None):
""" """
Posts a job to the server. Posts a job to the server.
Args: Args:
file_path (str): The path to the file to upload. file_path (str): The path to the file to upload.
job_list (list): A list of jobs to post. job_data (dict): A dict of jobs data.
callback (function, optional): A callback function to call during the upload. Defaults to None. callback (function, optional): A callback function to call during the upload. Defaults to None.
Returns: Returns:
Response: The response from the server. Response: The response from the server.
""" """
try: # Check if file exists
# Check if file exists if not os.path.exists(file_path):
if not os.path.exists(file_path): raise FileNotFoundError(f"File not found: {file_path}")
raise FileNotFoundError(f"File not found: {file_path}")
# Bypass uploading file if posting to localhost # Bypass uploading file if posting to localhost
if self.is_localhost: if self.is_localhost:
jobs_with_path = [{'local_path': file_path, **item} for item in job_list] job_data['local_path'] = file_path
job_data = json.dumps(jobs_with_path) url = urljoin(f'http://{self.hostname}:{self.port}', '/api/add_job')
url = urljoin(f'http://{self.hostname}:{self.port}', '/api/add_job') headers = {'Content-Type': 'application/json'}
headers = {'Content-Type': 'application/json'} return requests.post(url, data=json.dumps(job_data), headers=headers)
return requests.post(url, data=job_data, headers=headers)
# Prepare the form data for remote host # Prepare the form data for remote host
with open(file_path, 'rb') as file: with open(file_path, 'rb') as file:
encoder = MultipartEncoder({ encoder = MultipartEncoder({
'file': (os.path.basename(file_path), file, 'application/octet-stream'), 'file': (os.path.basename(file_path), file, 'application/octet-stream'),
'json': (None, json.dumps(job_list), 'application/json'), 'json': (None, json.dumps(job_data), 'application/json'),
}) })
# Create a monitor that will track the upload progress # Create a monitor that will track the upload progress
monitor = MultipartEncoderMonitor(encoder, callback) if callback else MultipartEncoderMonitor(encoder) monitor = MultipartEncoderMonitor(encoder, callback) if callback else MultipartEncoderMonitor(encoder)
headers = {'Content-Type': monitor.content_type} headers = {'Content-Type': monitor.content_type}
url = urljoin(f'http://{self.hostname}:{self.port}', '/api/add_job') url = urljoin(f'http://{self.hostname}:{self.port}', '/api/add_job')
# Send the request with proper resource management # Send the request with proper resource management
with requests.post(url, data=monitor, headers=headers) as response: with requests.post(url, data=monitor, headers=headers) as response:
return response return response
except requests.ConnectionError as e:
logger.error(f"Connection error: {e}")
except Exception as e:
logger.error(f"An error occurred: {e}")
def cancel_job(self, job_id, confirm=False): def cancel_job(self, job_id, confirm=False):
return self.request_data(f'job/{job_id}/cancel?confirm={confirm}') return self.request_data(f'job/{job_id}/cancel?confirm={confirm}')

View File

@@ -156,7 +156,7 @@ class DistributedJobManager:
logger.debug(f"New job output path: {output_path}") logger.debug(f"New job output path: {output_path}")
# create & configure jobs # create & configure jobs
worker = EngineManager.create_worker(engine_name=new_job_attributes['engine'], worker = EngineManager.create_worker(engine_name=new_job_attributes['engine_name'],
input_path=loaded_project_local_path, input_path=loaded_project_local_path,
output_path=output_path, output_path=output_path,
engine_version=new_job_attributes.get('engine_version'), engine_version=new_job_attributes.get('engine_version'),
@@ -358,7 +358,7 @@ class DistributedJobManager:
logger.debug(f"Posting subjob with frames {subjob['start_frame']}-" logger.debug(f"Posting subjob with frames {subjob['start_frame']}-"
f"{subjob['end_frame']} to {server_hostname}") f"{subjob['end_frame']} to {server_hostname}")
post_results = RenderServerProxy(server_hostname).post_job_to_server( post_results = RenderServerProxy(server_hostname).post_job_to_server(
file_path=project_path, job_list=[subjob]) file_path=project_path, job_data=subjob)
return post_results return post_results
# -------------------------------------------- # --------------------------------------------

View File

@@ -450,7 +450,7 @@ class SubmitWorker(QThread):
try: try:
hostname = self.window.server_input.currentText() hostname = self.window.server_input.currentText()
job_json = {'owner': psutil.Process().username() + '@' + socket.gethostname(), job_json = {'owner': psutil.Process().username() + '@' + socket.gethostname(),
'engine': self.window.engine_type.currentText().lower(), 'engine_name': self.window.engine_type.currentText().lower(),
'engine_version': self.window.engine_version_combo.currentText(), 'engine_version': self.window.engine_version_combo.currentText(),
'args': {'raw': self.window.raw_args.text(), 'args': {'raw': self.window.raw_args.text(),
'export_format': self.window.file_format_combo.currentText()}, 'export_format': self.window.file_format_combo.currentText()},
@@ -485,26 +485,26 @@ class SubmitWorker(QThread):
# process cameras into nested format # process cameras into nested format
input_path = self.window.scene_file_input.text() input_path = self.window.scene_file_input.text()
if selected_cameras: if selected_cameras and self.window.cameras_list.count() > 1:
job_list = [] children_jobs = []
for cam in selected_cameras: for cam in selected_cameras:
job_copy = copy.deepcopy(job_json) child_job_data = dict()
job_copy['args']['camera'] = cam child_job_data['args'] = {}
job_copy['name'] = job_copy['name'].replace(' ', '-') + "_" + cam.replace(' ', '') child_job_data['args']['camera'] = cam
job_copy['output_path'] = job_copy['name'] child_job_data['name'] = job_json['name'].replace(' ', '-') + "_" + cam.replace(' ', '')
job_list.append(job_copy) child_job_data['output_path'] = child_job_data['name']
else: children_jobs.append(child_job_data)
job_list = [job_json] job_json['child_jobs'] = children_jobs
# presubmission tasks # presubmission tasks
engine = EngineManager.engine_with_name(self.window.engine_type.currentText().lower()) engine = EngineManager.engine_with_name(self.window.engine_type.currentText().lower())
input_path = engine().perform_presubmission_tasks(input_path) input_path = engine().perform_presubmission_tasks(input_path)
# submit # submit
err_msg = "" err_msg = ""
result = self.window.server_proxy.post_job_to_server(file_path=input_path, job_list=job_list, result = self.window.server_proxy.post_job_to_server(file_path=input_path, job_data=job_json,
callback=create_callback) callback=create_callback)
if not (result and result.ok): if not (result and result.ok):
err_msg = "Error posting job to server." err_msg = f"Error posting job to server: {result.message}"
self.message_signal.emit(err_msg) self.message_signal.emit(err_msg)

View File

@@ -211,7 +211,7 @@ def check_for_updates(repo_name, repo_owner, app_name, current_version):
releases = get_github_releases(repo_owner, repo_name) releases = get_github_releases(repo_owner, repo_name)
if not releases: if not releases:
return return None
latest_version = releases[0] latest_version = releases[0]
latest_version_tag = latest_version['tag_name'] latest_version_tag = latest_version['tag_name']
@@ -221,7 +221,7 @@ def check_for_updates(repo_name, repo_owner, app_name, current_version):
logger.info(f"Newer version of {app_name} available. " logger.info(f"Newer version of {app_name} available. "
f"Latest: {latest_version_tag}, Current: {current_version}") f"Latest: {latest_version_tag}, Current: {current_version}")
return latest_version return latest_version
return None
def is_localhost(comparison_hostname): def is_localhost(comparison_hostname):
# this is necessary because socket.gethostname() does not always include '.local' - This is a sanitized comparison # this is necessary because socket.gethostname() does not always include '.local' - This is a sanitized comparison