From d8af7c878e22ab341b751b6ac04e6a3f2dad17c9 Mon Sep 17 00:00:00 2001 From: Brett Date: Wed, 31 Dec 2025 23:14:28 -0600 Subject: [PATCH] Job submission code and API cleanup (#127) * Refactor add jobs and make add_job api only be one job (instead of a list) * Renamed to JobImportHandler and misc cleanup * Dont bury exceptions in server proxy post_job * Update code to create child jobs in a cleaner manner --- add_job.py | 29 +++++-- job_launcher.py | 120 -------------------------- server.py | 25 +++--- src/api/add_job_helpers.py | 150 --------------------------------- src/api/api_server.py | 68 +++++++++++---- src/api/job_import_handler.py | 145 +++++++++++++++++++++++++++++++ src/api/server_proxy.py | 55 ++++++------ src/distributed_job_manager.py | 4 +- src/ui/add_job_window.py | 24 +++--- src/utilities/misc_helper.py | 4 +- 10 files changed, 267 insertions(+), 357 deletions(-) delete mode 100644 job_launcher.py delete mode 100644 src/api/add_job_helpers.py create mode 100644 src/api/job_import_handler.py diff --git a/add_job.py b/add_job.py index 4bd034b..ec0ece3 100644 --- a/add_job.py +++ b/add_job.py @@ -5,10 +5,9 @@ import logging import os import socket import sys -import threading import time -from server import start_server +from server import ZordonServer from src.api.serverproxy_manager import ServerProxyManager logger = logging.getLogger() @@ -84,23 +83,30 @@ def main(): found_proxy = ServerProxyManager.get_proxy_for_hostname(local_hostname) is_connected = found_proxy.check_connection() + adhoc_server = None if not is_connected: - local_server_thread = threading.Thread(target=start_server, args=[True], daemon=True) - local_server_thread.start() + adhoc_server = ZordonServer() + adhoc_server.start_server() + found_proxy = ServerProxyManager.get_proxy_for_hostname(adhoc_server.server_hostname) while not is_connected: # todo: add timeout - # is_connected = found_proxy.check_connection() + is_connected = found_proxy.check_connection() time.sleep(1) - new_job = {"name": job_name, "engine": args.engine} - response = found_proxy.post_job_to_server(file_path, [new_job]) + new_job = {"name": job_name, "engine_name": args.engine} + try: + response = found_proxy.post_job_to_server(file_path, new_job) + except Exception as e: + print(f"Error creating job: {e}") + exit(1) + if response and response.ok: print(f"Uploaded to {found_proxy.hostname} successfully!") - running_job_data = response.json()[0] + running_job_data = response.json() job_id = running_job_data.get('id') print(f"Job {job_id} Summary:") print(f" Status : {running_job_data.get('status')}") - print(f" Engine : {running_job_data.get('engine')}-{running_job_data.get('engine_version')}") + print(f" Engine : {running_job_data.get('engine_name')}-{running_job_data.get('engine_version')}") print("\nWaiting for render to complete...") percent_complete = 0.0 @@ -114,6 +120,11 @@ def main(): print(f"Percent Complete: {percent_complete:.2%}") sys.stdout.flush() print("Finished rendering successfully!") + else: + print(f"Failed to upload job. {response.text} !") + + if adhoc_server: + adhoc_server.stop_server() if __name__ == "__main__": diff --git a/job_launcher.py b/job_launcher.py deleted file mode 100644 index 7290ae3..0000000 --- a/job_launcher.py +++ /dev/null @@ -1,120 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -import logging -import os -import socket -import sys -import threading -import time - -from server import start_server -from src.api.serverproxy_manager import ServerProxyManager - -logger = logging.getLogger() - -def main(): - parser = argparse.ArgumentParser( - description="Zordon CLI tool for preparing/submitting a render job", - formatter_class=argparse.ArgumentDefaultsHelpFormatter - ) - - # Required arguments - parser.add_argument("scene_file", help="Path to the scene file (e.g., .blend, .max, .mp4)") - parser.add_argument("engine", help="Desired render engine", choices=['blender', 'ffmpeg']) - - # Frame range - parser.add_argument("--start", type=int, default=1, help="Start frame") - parser.add_argument("--end", type=int, default=1, help="End frame") - - # Job metadata - parser.add_argument("--name", default=None, help="Job name") - - # Output - parser.add_argument("--output", default="", help="Output path/pattern (e.g., /renders/frame_####.exr)") - - # Target OS and Engine Version - parser.add_argument( - "--os", - choices=["any", "windows", "linux", "macos"], - default="any", - help="Target operating system for render workers" - ) - parser.add_argument( - "--engine-version", - default="latest", - help="Required renderer/engine version number (e.g., '4.2', '5.0')" - ) - - # Optional flags - parser.add_argument("--dry-run", action="store_true", help="Print job details without submitting") - - args = parser.parse_args() - - # Basic validation - if not os.path.exists(args.scene_file): - print(f"Error: Scene file '{args.scene_file}' not found!", file=sys.stderr) - sys.exit(1) - - if args.start > args.end: - print("Error: Start frame cannot be greater than end frame!", file=sys.stderr) - sys.exit(1) - - # Calculate total frames - total_frames = len(range(args.start, args.end + 1)) - job_name = args.name or os.path.basename(args.scene_file) - file_path = os.path.abspath(args.scene_file) - - # Print job summary - print("Render Job Summary:") - print(f" Job Name : {job_name}") - print(f" Scene File : {file_path}") - print(f" Engine : {args.engine}") - print(f" Frames : {args.start}-{args.end} → {total_frames} frames") - print(f" Output Path : {args.output or '(default from scene)'}") - print(f" Target OS : {args.os}") - print(f" Engine Version : {args.engine_version}") - - if args.dry_run: - print("\nDry run complete (no submission performed).") - return - - local_hostname = socket.gethostname() - local_hostname = local_hostname + (".local" if not local_hostname.endswith(".local") else "") - found_proxy = ServerProxyManager.get_proxy_for_hostname(local_hostname) - - is_connected = found_proxy.check_connection() - if not is_connected: - local_server_thread = threading.Thread(target=start_server, args=[True], daemon=True) - local_server_thread.start() - while not is_connected: - # todo: add timeout - # is_connected = found_proxy.check_connection() - time.sleep(1) - - new_job = {"name": job_name, "renderer": args.engine} - response = found_proxy.post_job_to_server(file_path, [new_job]) - if response and response.ok: - print(f"Uploaded to {found_proxy.hostname} successfully!") - running_job_data = response.json()[0] - job_id = running_job_data.get('id') - print(f"Job {job_id} Summary:") - print(f" Status : {running_job_data.get('status')}") - print(f" Engine : {running_job_data.get('renderer')}-{running_job_data.get('renderer_version')}") - - print("\nWaiting for render to complete...") - percent_complete = 0.0 - while percent_complete < 1.0: - # add checks for errors - time.sleep(1) - running_job_data = found_proxy.get_job_info(job_id) - percent_complete = running_job_data['percent_complete'] - sys.stdout.write("\x1b[1A") # Move up 1 - sys.stdout.write("\x1b[0J") # Clear from cursor to end of screen (optional) - print(f"Percent Complete: {percent_complete:.2%}") - sys.stdout.flush() - print("Finished rendering successfully!") - - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/server.py b/server.py index 97f4908..424c739 100755 --- a/server.py +++ b/server.py @@ -2,10 +2,8 @@ import logging import multiprocessing import os import socket -import sys import threading -import cpuinfo import psutil from src.api.api_server import API_VERSION @@ -19,7 +17,7 @@ from src.utilities.config import Config from src.utilities.misc_helper import (get_gpu_info, system_safe_path, current_system_cpu, current_system_os, current_system_os_version, current_system_cpu_brand, check_for_updates) from src.utilities.zeroconf_server import ZeroconfServer -from src.version import APP_NAME, APP_VERSION, APP_REPO_NAME, APP_REPO_OWNER +from src.version import APP_NAME, APP_VERSION logger = logging.getLogger() @@ -45,12 +43,8 @@ class ZordonServer: PreviewManager.storage_path = system_safe_path( os.path.join(os.path.expanduser(Config.upload_folder), 'previews')) - # Debug info - logger.debug(f"Upload directory: {os.path.expanduser(Config.upload_folder)}") - logger.debug(f"Thumbs directory: {PreviewManager.storage_path}") - logger.debug(f"Engines directory: {EngineManager.engines_path}") - self.api_server = None + self.server_hostname = None def start_server(self): @@ -77,22 +71,25 @@ class ZordonServer: raise ProcessLookupError(err_msg) # main start - logger.info(f"Starting {APP_NAME} Render Server") + logger.info(f"Starting {APP_NAME} Render Server ({APP_VERSION})") + logger.debug(f"Upload directory: {os.path.expanduser(Config.upload_folder)}") + logger.debug(f"Thumbs directory: {PreviewManager.storage_path}") + logger.debug(f"Engines directory: {EngineManager.engines_path}") # Set up the RenderQueue object RenderQueue.load_state(database_directory=system_safe_path(os.path.expanduser(Config.upload_folder))) ServerProxyManager.subscribe_to_listener() DistributedJobManager.subscribe_to_listener() # get hostname - local_hostname = socket.gethostname() + self.server_hostname = socket.gethostname() # configure and start API server - self.api_server = threading.Thread(target=start_api_server, args=(local_hostname,)) + self.api_server = threading.Thread(target=start_api_server, args=(self.server_hostname,)) self.api_server.daemon = True self.api_server.start() # start zeroconf server - ZeroconfServer.configure(f"_{APP_NAME.lower()}._tcp.local.", local_hostname, Config.port_number) + ZeroconfServer.configure(f"_{APP_NAME.lower()}._tcp.local.", self.server_hostname, Config.port_number) ZeroconfServer.properties = {'system_cpu': current_system_cpu(), 'system_cpu_brand': current_system_cpu_brand(), 'system_cpu_cores': multiprocessing.cpu_count(), @@ -102,7 +99,7 @@ class ZordonServer: 'gpu_info': get_gpu_info(), 'api_version': API_VERSION} ZeroconfServer.start() - logger.info(f"{APP_NAME} Render Server started - Hostname: {local_hostname}") + logger.info(f"{APP_NAME} Render Server started - Hostname: {self.server_hostname}") RenderQueue.start() # Start evaluating the render queue def is_running(self): @@ -125,6 +122,6 @@ if __name__ == '__main__': except KeyboardInterrupt: pass except Exception as e: - logger.error(f"Unhandled exception: {e}") + logger.fatal(f"Unhandled exception: {e}") finally: server.stop_server() diff --git a/src/api/add_job_helpers.py b/src/api/add_job_helpers.py deleted file mode 100644 index f991412..0000000 --- a/src/api/add_job_helpers.py +++ /dev/null @@ -1,150 +0,0 @@ -#!/usr/bin/env python3 -import logging -import os -import shutil -import tempfile -import zipfile -from datetime import datetime - -import requests -from tqdm import tqdm -from werkzeug.utils import secure_filename - -logger = logging.getLogger() - - -def handle_uploaded_project_files(request, jobs_list, upload_directory): - """ - Handles the uploaded project files. - - This method takes a request with a file, a list of jobs, and an upload directory. It checks if the file was uploaded - directly, if it needs to be downloaded from a URL, or if it's already present on the local file system. It then - moves the file to the appropriate directory and returns the local path to the file and its name. - - Args: - request (Request): The request object containing the file. - jobs_list (list): A list of jobs. The first job in the list is used to get the file's URL and local path. - upload_directory (str): The directory where the file should be uploaded. - - Raises: - ValueError: If no valid project paths are found. - - Returns: - tuple: A tuple containing the local path to the loaded project file and its name. - """ - # Initialize default values - loaded_project_local_path = None - - uploaded_project = request.files.get('file', None) - project_url = jobs_list[0].get('url', None) - local_path = jobs_list[0].get('local_path', None) - engine_name = jobs_list[0]['engine_name'] - downloaded_file_url = None - - if uploaded_project and uploaded_project.filename: - referred_name = os.path.basename(uploaded_project.filename) - elif project_url: - referred_name, downloaded_file_url = download_project_from_url(project_url) - if not referred_name: - raise ValueError(f"Error downloading file from URL: {project_url}") - elif local_path and os.path.exists(local_path): - referred_name = os.path.basename(local_path) - else: - raise ValueError("Cannot find any valid project paths") - - # Prepare the local filepath - cleaned_path_name = jobs_list[0].get('name', os.path.splitext(referred_name)[0]).replace(' ', '-') - job_dir = os.path.join(upload_directory, '-'.join( - [datetime.now().strftime("%Y.%m.%d_%H.%M.%S"), engine_name, cleaned_path_name])) - os.makedirs(job_dir, exist_ok=True) - project_source_dir = os.path.join(job_dir, 'source') - os.makedirs(project_source_dir, exist_ok=True) - - # Move projects to their work directories - if uploaded_project and uploaded_project.filename: - loaded_project_local_path = os.path.join(project_source_dir, secure_filename(uploaded_project.filename)) - uploaded_project.save(loaded_project_local_path) - logger.info(f"Transfer complete for {loaded_project_local_path.split(upload_directory)[-1]}") - elif project_url: - loaded_project_local_path = os.path.join(project_source_dir, referred_name) - shutil.move(downloaded_file_url, loaded_project_local_path) - logger.info(f"Download complete for {loaded_project_local_path.split(upload_directory)[-1]}") - elif local_path: - loaded_project_local_path = os.path.join(project_source_dir, referred_name) - shutil.copy(local_path, loaded_project_local_path) - logger.info(f"Import complete for {loaded_project_local_path.split(upload_directory)[-1]}") - - return loaded_project_local_path, referred_name - - -def download_project_from_url(project_url): - # This nested function is to handle downloading from a URL - logger.info(f"Downloading project from url: {project_url}") - referred_name = os.path.basename(project_url) - - try: - response = requests.get(project_url, stream=True) - if response.status_code == 200: - # Get the total file size from the "Content-Length" header - file_size = int(response.headers.get("Content-Length", 0)) - # Create a progress bar using tqdm - progress_bar = tqdm(total=file_size, unit="B", unit_scale=True) - # Open a file for writing in binary mode - downloaded_file_url = os.path.join(tempfile.gettempdir(), referred_name) - with open(downloaded_file_url, "wb") as file: - for chunk in response.iter_content(chunk_size=1024): - if chunk: - # Write the chunk to the file - file.write(chunk) - # Update the progress bar - progress_bar.update(len(chunk)) - # Close the progress bar - progress_bar.close() - return referred_name, downloaded_file_url - except Exception as e: - logger.error(f"Error downloading file: {e}") - return None, None - - -def process_zipped_project(zip_path): - """ - Processes a zipped project. - - This method takes a path to a zip file, extracts its contents, and returns the path to the extracted project file. - If the zip file contains more than one project file or none, an error is raised. - - Args: - zip_path (str): The path to the zip file. - - Raises: - ValueError: If there's more than 1 project file or none in the zip file. - - Returns: - str: The path to the main project file. - """ - work_path = os.path.dirname(zip_path) - - try: - with zipfile.ZipFile(zip_path, 'r') as myzip: - myzip.extractall(work_path) - - project_files = [x for x in os.listdir(work_path) if os.path.isfile(os.path.join(work_path, x))] - project_files = [x for x in project_files if '.zip' not in x] - - logger.debug(f"Zip files: {project_files}") - - # supported_exts = RenderWorkerFactory.class_for_name(engine).engine.supported_extensions - # if supported_exts: - # project_files = [file for file in project_files if any(file.endswith(ext) for ext in supported_exts)] - - # If there's more than 1 project file or none, raise an error - if len(project_files) != 1: - raise ValueError(f'Cannot find a valid project file in {os.path.basename(zip_path)}') - - extracted_project_path = os.path.join(work_path, project_files[0]) - logger.info(f"Extracted zip file to {extracted_project_path}") - - except (zipfile.BadZipFile, zipfile.LargeZipFile) as e: - logger.error(f"Error processing zip file: {e}") - raise ValueError(f"Error processing zip file: {e}") - return extracted_project_path diff --git a/src/api/api_server.py b/src/api/api_server.py index 6e09120..34568e5 100755 --- a/src/api/api_server.py +++ b/src/api/api_server.py @@ -17,7 +17,7 @@ import yaml from flask import Flask, request, send_file, after_this_request, Response, redirect, url_for from sqlalchemy.orm.exc import DetachedInstanceError -from src.api.add_job_helpers import handle_uploaded_project_files, process_zipped_project +from src.api.job_import_handler import JobImportHandler from src.api.preview_manager import PreviewManager from src.distributed_job_manager import DistributedJobManager from src.engines.engine_manager import EngineManager @@ -32,7 +32,7 @@ logger = logging.getLogger() server = Flask(__name__) ssl._create_default_https_context = ssl._create_unverified_context # disable SSL for downloads -API_VERSION = "1" +API_VERSION = "0.1" def start_api_server(hostname=None): @@ -252,32 +252,66 @@ def status(): @server.post('/api/add_job') def add_job_handler(): - # Process request data + """ + POST /api/add_job + Add a render job to the queue. + + **Request Formats** + - JSON body: + { + "name": "example.blend", + "engine": "blender", + "frame_start": 1, + "frame_end": 100, + "render_settings": {...} + "child_jobs"; [...] + } + + **Responses** + 200 Success + 400 Invalid or missing input + 500 Internal server error while parsing or creating jobs + """ try: if request.is_json: - jobs_list = [request.json] if not isinstance(request.json, list) else request.json + new_job_data = request.get_json() elif request.form.get('json', None): - jobs_list = json.loads(request.form['json']) + new_job_data = json.loads(request.form['json']) else: - return "Invalid data", 400 + return "Cannot find valid job data", 400 except Exception as e: err_msg = f"Error processing job data: {e}" logger.error(err_msg) return err_msg, 500 + # Validate Job Data - check for required values and download or unzip project files try: - loaded_project_local_path, referred_name = handle_uploaded_project_files(request, jobs_list, - server.config['UPLOAD_FOLDER']) - if loaded_project_local_path.lower().endswith('.zip'): - loaded_project_local_path = process_zipped_project(loaded_project_local_path) - - results = [] - for new_job_data in jobs_list: - new_job = DistributedJobManager.create_render_job(new_job_data, loaded_project_local_path) - results.append(new_job.json()) - return results, 200 + processed_job_data = JobImportHandler.validate_job_data(new_job_data, server.config['UPLOAD_FOLDER'], + uploaded_file=request.files.get('file')) + except (KeyError, FileNotFoundError) as e: + err_msg = f"Error processing job data: {e}" + return err_msg, 400 except Exception as e: - logger.exception(f"Error adding job: {e}") + err_msg = f"Unknown error processing data: {e}" + return err_msg, 500 + + try: + loaded_project_local_path = processed_job_data['__loaded_project_local_path'] + created_jobs = [] + if processed_job_data.get("child_jobs"): + for child_job_diffs in processed_job_data["child_jobs"]: + processed_child_job_data = processed_job_data.copy() + processed_child_job_data.pop("child_jobs") + processed_child_job_data.update(child_job_diffs) + child_job = DistributedJobManager.create_render_job(processed_child_job_data, loaded_project_local_path) + created_jobs.append(child_job) + else: + new_job = DistributedJobManager.create_render_job(processed_job_data, loaded_project_local_path) + created_jobs.append(new_job) + + return [x.json() for x in created_jobs] + except Exception as e: + logger.exception(f"Error creating render job: {e}") return 'unknown error', 500 diff --git a/src/api/job_import_handler.py b/src/api/job_import_handler.py new file mode 100644 index 0000000..f8dc606 --- /dev/null +++ b/src/api/job_import_handler.py @@ -0,0 +1,145 @@ +#!/usr/bin/env python3 +import logging +import os +import shutil +import tempfile +import zipfile +from datetime import datetime + +import requests +from tqdm import tqdm +from werkzeug.utils import secure_filename + +logger = logging.getLogger() + + +class JobImportHandler: + + @classmethod + def validate_job_data(cls, new_job_data, upload_directory, uploaded_file=None): + loaded_project_local_path = None + + # check for required keys + job_name = new_job_data.get('name') + engine_name = new_job_data.get('engine_name') + if not job_name: + raise KeyError("Missing job name") + elif not engine_name: + raise KeyError("Missing engine name") + + project_url = new_job_data.get('url', None) + local_path = new_job_data.get('local_path', None) + downloaded_file_url = None + + if uploaded_file and uploaded_file.filename: + referred_name = os.path.basename(uploaded_file.filename) + elif project_url: + referred_name, downloaded_file_url = cls.download_project_from_url(project_url) + if not referred_name: + raise FileNotFoundError(f"Error downloading file from URL: {project_url}") + elif local_path and os.path.exists(local_path): + referred_name = os.path.basename(local_path) + else: + raise FileNotFoundError("Cannot find any valid project paths") + + # Prepare the local filepath + cleaned_path_name = os.path.splitext(referred_name)[0].replace(' ', '-') + job_dir = os.path.join(upload_directory, '-'.join( + [datetime.now().strftime("%Y.%m.%d_%H.%M.%S"), engine_name, cleaned_path_name])) + os.makedirs(job_dir, exist_ok=True) + project_source_dir = os.path.join(job_dir, 'source') + os.makedirs(project_source_dir, exist_ok=True) + + # Move projects to their work directories + if uploaded_file and uploaded_file.filename: + loaded_project_local_path = os.path.join(project_source_dir, secure_filename(uploaded_file.filename)) + uploaded_file.save(loaded_project_local_path) + logger.info(f"Transfer complete for {loaded_project_local_path.split(upload_directory)[-1]}") + elif project_url: + loaded_project_local_path = os.path.join(project_source_dir, referred_name) + shutil.move(downloaded_file_url, loaded_project_local_path) + logger.info(f"Download complete for {loaded_project_local_path.split(upload_directory)[-1]}") + elif local_path: + loaded_project_local_path = os.path.join(project_source_dir, referred_name) + shutil.copy(local_path, loaded_project_local_path) + logger.info(f"Import complete for {loaded_project_local_path.split(upload_directory)[-1]}") + + if loaded_project_local_path.lower().endswith('.zip'): + loaded_project_local_path = cls.process_zipped_project(loaded_project_local_path) + + new_job_data["__loaded_project_local_path"] = loaded_project_local_path + + return new_job_data + + @staticmethod + def download_project_from_url(project_url): + # This nested function is to handle downloading from a URL + logger.info(f"Downloading project from url: {project_url}") + referred_name = os.path.basename(project_url) + + try: + response = requests.get(project_url, stream=True) + if response.status_code == 200: + # Get the total file size from the "Content-Length" header + file_size = int(response.headers.get("Content-Length", 0)) + # Create a progress bar using tqdm + progress_bar = tqdm(total=file_size, unit="B", unit_scale=True) + # Open a file for writing in binary mode + downloaded_file_url = os.path.join(tempfile.gettempdir(), referred_name) + with open(downloaded_file_url, "wb") as file: + for chunk in response.iter_content(chunk_size=1024): + if chunk: + # Write the chunk to the file + file.write(chunk) + # Update the progress bar + progress_bar.update(len(chunk)) + # Close the progress bar + progress_bar.close() + return referred_name, downloaded_file_url + except Exception as e: + logger.error(f"Error downloading file: {e}") + return None, None + + @staticmethod + def process_zipped_project(zip_path): + """ + Processes a zipped project. + + This method takes a path to a zip file, extracts its contents, and returns the path to the extracted project file. + If the zip file contains more than one project file or none, an error is raised. + + Args: + zip_path (str): The path to the zip file. + + Raises: + ValueError: If there's more than 1 project file or none in the zip file. + + Returns: + str: The path to the main project file. + """ + work_path = os.path.dirname(zip_path) + + try: + with zipfile.ZipFile(zip_path, 'r') as myzip: + myzip.extractall(work_path) + + project_files = [x for x in os.listdir(work_path) if os.path.isfile(os.path.join(work_path, x))] + project_files = [x for x in project_files if '.zip' not in x] + + logger.debug(f"Zip files: {project_files}") + + # supported_exts = RenderWorkerFactory.class_for_name(engine).engine.supported_extensions + # if supported_exts: + # project_files = [file for file in project_files if any(file.endswith(ext) for ext in supported_exts)] + + # If there's more than 1 project file or none, raise an error + if len(project_files) != 1: + raise ValueError(f'Cannot find a valid project file in {os.path.basename(zip_path)}') + + extracted_project_path = os.path.join(work_path, project_files[0]) + logger.info(f"Extracted zip file to {extracted_project_path}") + + except (zipfile.BadZipFile, zipfile.LargeZipFile) as e: + logger.error(f"Error processing zip file: {e}") + raise ValueError(f"Error processing zip file: {e}") + return extracted_project_path diff --git a/src/api/server_proxy.py b/src/api/server_proxy.py index 73bb469..bbf2d7a 100644 --- a/src/api/server_proxy.py +++ b/src/api/server_proxy.py @@ -184,51 +184,44 @@ class RenderServerProxy: # Job Lifecycle: # -------------------------------------------- - def post_job_to_server(self, file_path, job_list, callback=None): + def post_job_to_server(self, file_path, job_data, callback=None): """ Posts a job to the server. Args: file_path (str): The path to the file to upload. - job_list (list): A list of jobs to post. + job_data (dict): A dict of jobs data. callback (function, optional): A callback function to call during the upload. Defaults to None. Returns: Response: The response from the server. """ - try: - # Check if file exists - if not os.path.exists(file_path): - raise FileNotFoundError(f"File not found: {file_path}") + # Check if file exists + if not os.path.exists(file_path): + raise FileNotFoundError(f"File not found: {file_path}") - # Bypass uploading file if posting to localhost - if self.is_localhost: - jobs_with_path = [{'local_path': file_path, **item} for item in job_list] - job_data = json.dumps(jobs_with_path) - url = urljoin(f'http://{self.hostname}:{self.port}', '/api/add_job') - headers = {'Content-Type': 'application/json'} - return requests.post(url, data=job_data, headers=headers) + # Bypass uploading file if posting to localhost + if self.is_localhost: + job_data['local_path'] = file_path + url = urljoin(f'http://{self.hostname}:{self.port}', '/api/add_job') + headers = {'Content-Type': 'application/json'} + return requests.post(url, data=json.dumps(job_data), headers=headers) - # Prepare the form data for remote host - with open(file_path, 'rb') as file: - encoder = MultipartEncoder({ - 'file': (os.path.basename(file_path), file, 'application/octet-stream'), - 'json': (None, json.dumps(job_list), 'application/json'), - }) + # Prepare the form data for remote host + with open(file_path, 'rb') as file: + encoder = MultipartEncoder({ + 'file': (os.path.basename(file_path), file, 'application/octet-stream'), + 'json': (None, json.dumps(job_data), 'application/json'), + }) - # Create a monitor that will track the upload progress - monitor = MultipartEncoderMonitor(encoder, callback) if callback else MultipartEncoderMonitor(encoder) - headers = {'Content-Type': monitor.content_type} - url = urljoin(f'http://{self.hostname}:{self.port}', '/api/add_job') + # Create a monitor that will track the upload progress + monitor = MultipartEncoderMonitor(encoder, callback) if callback else MultipartEncoderMonitor(encoder) + headers = {'Content-Type': monitor.content_type} + url = urljoin(f'http://{self.hostname}:{self.port}', '/api/add_job') - # Send the request with proper resource management - with requests.post(url, data=monitor, headers=headers) as response: - return response - - except requests.ConnectionError as e: - logger.error(f"Connection error: {e}") - except Exception as e: - logger.error(f"An error occurred: {e}") + # Send the request with proper resource management + with requests.post(url, data=monitor, headers=headers) as response: + return response def cancel_job(self, job_id, confirm=False): return self.request_data(f'job/{job_id}/cancel?confirm={confirm}') diff --git a/src/distributed_job_manager.py b/src/distributed_job_manager.py index 1bfab6e..1dc8c5d 100644 --- a/src/distributed_job_manager.py +++ b/src/distributed_job_manager.py @@ -156,7 +156,7 @@ class DistributedJobManager: logger.debug(f"New job output path: {output_path}") # create & configure jobs - worker = EngineManager.create_worker(engine_name=new_job_attributes['engine'], + worker = EngineManager.create_worker(engine_name=new_job_attributes['engine_name'], input_path=loaded_project_local_path, output_path=output_path, engine_version=new_job_attributes.get('engine_version'), @@ -358,7 +358,7 @@ class DistributedJobManager: logger.debug(f"Posting subjob with frames {subjob['start_frame']}-" f"{subjob['end_frame']} to {server_hostname}") post_results = RenderServerProxy(server_hostname).post_job_to_server( - file_path=project_path, job_list=[subjob]) + file_path=project_path, job_data=subjob) return post_results # -------------------------------------------- diff --git a/src/ui/add_job_window.py b/src/ui/add_job_window.py index 745a9a4..07a1ad4 100644 --- a/src/ui/add_job_window.py +++ b/src/ui/add_job_window.py @@ -450,7 +450,7 @@ class SubmitWorker(QThread): try: hostname = self.window.server_input.currentText() job_json = {'owner': psutil.Process().username() + '@' + socket.gethostname(), - 'engine': self.window.engine_type.currentText().lower(), + 'engine_name': self.window.engine_type.currentText().lower(), 'engine_version': self.window.engine_version_combo.currentText(), 'args': {'raw': self.window.raw_args.text(), 'export_format': self.window.file_format_combo.currentText()}, @@ -485,26 +485,26 @@ class SubmitWorker(QThread): # process cameras into nested format input_path = self.window.scene_file_input.text() - if selected_cameras: - job_list = [] + if selected_cameras and self.window.cameras_list.count() > 1: + children_jobs = [] for cam in selected_cameras: - job_copy = copy.deepcopy(job_json) - job_copy['args']['camera'] = cam - job_copy['name'] = job_copy['name'].replace(' ', '-') + "_" + cam.replace(' ', '') - job_copy['output_path'] = job_copy['name'] - job_list.append(job_copy) - else: - job_list = [job_json] + child_job_data = dict() + child_job_data['args'] = {} + child_job_data['args']['camera'] = cam + child_job_data['name'] = job_json['name'].replace(' ', '-') + "_" + cam.replace(' ', '') + child_job_data['output_path'] = child_job_data['name'] + children_jobs.append(child_job_data) + job_json['child_jobs'] = children_jobs # presubmission tasks engine = EngineManager.engine_with_name(self.window.engine_type.currentText().lower()) input_path = engine().perform_presubmission_tasks(input_path) # submit err_msg = "" - result = self.window.server_proxy.post_job_to_server(file_path=input_path, job_list=job_list, + result = self.window.server_proxy.post_job_to_server(file_path=input_path, job_data=job_json, callback=create_callback) if not (result and result.ok): - err_msg = "Error posting job to server." + err_msg = f"Error posting job to server: {result.message}" self.message_signal.emit(err_msg) diff --git a/src/utilities/misc_helper.py b/src/utilities/misc_helper.py index c68fdec..9cc69a2 100644 --- a/src/utilities/misc_helper.py +++ b/src/utilities/misc_helper.py @@ -211,7 +211,7 @@ def check_for_updates(repo_name, repo_owner, app_name, current_version): releases = get_github_releases(repo_owner, repo_name) if not releases: - return + return None latest_version = releases[0] latest_version_tag = latest_version['tag_name'] @@ -221,7 +221,7 @@ def check_for_updates(repo_name, repo_owner, app_name, current_version): logger.info(f"Newer version of {app_name} available. " f"Latest: {latest_version_tag}, Current: {current_version}") return latest_version - + return None def is_localhost(comparison_hostname): # this is necessary because socket.gethostname() does not always include '.local' - This is a sanitized comparison