From 03e7b95e1bf2b482ec66578549b58f103cc59846 Mon Sep 17 00:00:00 2001 From: Brett Williams Date: Tue, 24 Oct 2023 20:53:06 -0500 Subject: [PATCH] Split add_job_handler out to all_job_helpers.py --- src/api/add_job_helpers.py | 175 +++++++++++++++++++++++++++++++++++++ src/api/api_server.py | 157 +++------------------------------ 2 files changed, 185 insertions(+), 147 deletions(-) create mode 100644 src/api/add_job_helpers.py diff --git a/src/api/add_job_helpers.py b/src/api/add_job_helpers.py new file mode 100644 index 0000000..4edb011 --- /dev/null +++ b/src/api/add_job_helpers.py @@ -0,0 +1,175 @@ +#!/usr/bin/env python3 +import logging +import os +import shutil +import tempfile +import zipfile +from datetime import datetime + +import requests +from tqdm import tqdm +from werkzeug.utils import secure_filename + +from src.distributed_job_manager import DistributedJobManager +from src.engines.core.worker_factory import RenderWorkerFactory +from src.render_queue import RenderQueue + +logger = logging.getLogger() + + +def handle_uploaded_project_files(request, jobs_list, upload_directory): + # Initialize default values + loaded_project_local_path = None + + uploaded_project = request.files.get('file', None) + project_url = jobs_list[0].get('url', None) + local_path = jobs_list[0].get('local_path', None) + renderer = jobs_list[0].get('renderer') + downloaded_file_url = None + + if uploaded_project and uploaded_project.filename: + referred_name = os.path.basename(uploaded_project.filename) + elif project_url: + referred_name, downloaded_file_url = download_project_from_url(project_url) + if not referred_name: + raise ValueError(f"Error downloading file from URL: {project_url}") + elif local_path and os.path.exists(local_path): + referred_name = os.path.basename(local_path) + + else: + raise ValueError("Cannot find any valid project paths") + + # Prepare the local filepath + cleaned_path_name = os.path.splitext(referred_name)[0].replace(' ', '_') + job_dir = os.path.join(upload_directory, '-'.join( + [datetime.now().strftime("%Y.%m.%d_%H.%M.%S"), renderer, cleaned_path_name])) + os.makedirs(job_dir, exist_ok=True) + upload_dir = os.path.join(job_dir, 'source') + os.makedirs(upload_dir, exist_ok=True) + + # Move projects to their work directories + if uploaded_project and uploaded_project.filename: + loaded_project_local_path = os.path.join(upload_dir, secure_filename(uploaded_project.filename)) + uploaded_project.save(loaded_project_local_path) + logger.info(f"Transfer complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}") + elif project_url: + loaded_project_local_path = os.path.join(upload_dir, referred_name) + shutil.move(downloaded_file_url, loaded_project_local_path) + logger.info(f"Download complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}") + elif local_path: + loaded_project_local_path = os.path.join(upload_dir, referred_name) + shutil.copy(local_path, loaded_project_local_path) + logger.info(f"Import complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}") + + return loaded_project_local_path, referred_name + + +def download_project_from_url(project_url): + # This nested function is to handle downloading from a URL + logger.info(f"Downloading project from url: {project_url}") + referred_name = os.path.basename(project_url) + downloaded_file_url = None + + try: + response = requests.get(project_url, stream=True) + if response.status_code == 200: + # Get the total file size from the "Content-Length" header + file_size = int(response.headers.get("Content-Length", 0)) + # Create a progress bar using tqdm + progress_bar = tqdm(total=file_size, unit="B", unit_scale=True) + # Open a file for writing in binary mode + downloaded_file_url = os.path.join(tempfile.gettempdir(), referred_name) + with open(downloaded_file_url, "wb") as file: + for chunk in response.iter_content(chunk_size=1024): + if chunk: + # Write the chunk to the file + file.write(chunk) + # Update the progress bar + progress_bar.update(len(chunk)) + # Close the progress bar + progress_bar.close() + else: + return None, None + except Exception as e: + logger.error(f"Error downloading file: {e}") + return None, None + + return referred_name, downloaded_file_url + + +def process_zipped_project(zip_path): + # Given a zip path, extract its content, and return the main project file path + work_path = os.path.dirname(zip_path) + + try: + with zipfile.ZipFile(zip_path, 'r') as myzip: + myzip.extractall(work_path) + + project_files = [x for x in os.listdir(work_path) if os.path.isfile(os.path.join(work_path, x))] + project_files = [x for x in project_files if '.zip' not in x] + + logger.debug(f"Zip files: {project_files}") + + # supported_exts = RenderWorkerFactory.class_for_name(renderer).engine.supported_extensions + # if supported_exts: + # project_files = [file for file in project_files if any(file.endswith(ext) for ext in supported_exts)] + + # If there's more than 1 project file or none, raise an error + if len(project_files) != 1: + raise ValueError(f'Cannot find a valid project file in {os.path.basename(zip_path)}') + + extracted_project_path = os.path.join(work_path, project_files[0]) + logger.info(f"Extracted zip file to {extracted_project_path}") + + except (zipfile.BadZipFile, zipfile.LargeZipFile) as e: + logger.error(f"Error processing zip file: {e}") + raise ValueError(f"Error processing zip file: {e}") + return extracted_project_path + + +def create_render_jobs(jobs_list, loaded_project_local_path, job_dir, enable_split_jobs=False): + results = [] + + for job_data in jobs_list: + try: + # prepare output paths + output_dir = os.path.join(job_dir, job_data.get('name') if len(jobs_list) > 1 else 'output') + os.makedirs(output_dir, exist_ok=True) + + # get new output path in output_dir + job_data['output_path'] = os.path.join(output_dir, os.path.basename( + job_data.get('output_path', None) or loaded_project_local_path + )) + + # create & configure jobs + worker = RenderWorkerFactory.create_worker(renderer=job_data['renderer'], + input_path=loaded_project_local_path, + output_path=job_data["output_path"], + engine_version=job_data.get('engine_version'), + args=job_data.get('args', {})) + worker.status = job_data.get("initial_status", worker.status) + worker.parent = job_data.get("parent", worker.parent) + worker.name = job_data.get("name", worker.name) + worker.priority = int(job_data.get('priority', worker.priority)) + worker.start_frame = int(job_data.get("start_frame", worker.start_frame)) + worker.end_frame = int(job_data.get("end_frame", worker.end_frame)) + + # determine if we can / should split the job + if enable_split_jobs and (worker.total_frames > 1) and not worker.parent: + DistributedJobManager.split_into_subjobs(worker, job_data, loaded_project_local_path) + + RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False)) + if not worker.parent: + from src.api.api_server import make_job_ready + make_job_ready(worker.id) + results.append(worker.json()) + except FileNotFoundError as e: + err_msg = f"Cannot create job: {e}" + logger.error(err_msg) + results.append({'error': err_msg}) + except Exception as e: + err_msg = f"Exception creating render job: {e}" + logger.exception(err_msg) + results.append({'error': err_msg}) + + return results diff --git a/src/api/api_server.py b/src/api/api_server.py index df38731..461cd6a 100755 --- a/src/api/api_server.py +++ b/src/api/api_server.py @@ -10,19 +10,16 @@ import ssl import tempfile import threading import time -import zipfile from datetime import datetime from zipfile import ZipFile import json2html import psutil -import requests import yaml from flask import Flask, request, render_template, send_file, after_this_request, Response, redirect, url_for, abort -from tqdm import tqdm -from werkzeug.utils import secure_filename from src.api.server_proxy import RenderServerProxy +from src.api.add_job_helpers import handle_uploaded_project_files, process_zipped_project, create_render_jobs from src.distributed_job_manager import DistributedJobManager from src.engines.core.base_worker import string_to_status, RenderStatus from src.engines.core.worker_factory import RenderWorkerFactory @@ -283,9 +280,10 @@ def detected_clients(): return ZeroconfServer.found_clients() +# New version @server.post('/api/add_job') def add_job_handler(): - # initial handling of raw data + # Process request data try: if request.is_json: jobs_list = [request.json] if not isinstance(request.json, list) else request.json @@ -309,157 +307,21 @@ def add_job_handler(): logger.error(err_msg) return err_msg, 500 - # start handling project files try: - # handle uploaded files - logger.debug(f"Incoming new job request: {jobs_list}") - uploaded_project = request.files.get('file', None) - project_url = jobs_list[0].get('url', None) - local_path = jobs_list[0].get('local_path', None) - renderer = jobs_list[0].get('renderer') + loaded_project_local_path, referred_name = handle_uploaded_project_files(request, jobs_list, + server.config['UPLOAD_FOLDER']) + if loaded_project_local_path.lower().endswith('.zip'): + loaded_project_local_path = process_zipped_project(loaded_project_local_path) - downloaded_file_url = None - if uploaded_project and uploaded_project.filename: - referred_name = os.path.basename(uploaded_project.filename) - elif project_url: - # download and save url - have to download first to know filename due to redirects - logger.info(f"Downloading project from url: {project_url}") - try: - referred_name = os.path.basename(project_url) - response = requests.get(project_url, stream=True) - if response.status_code == 200: - # Get the total file size from the "Content-Length" header - file_size = int(response.headers.get("Content-Length", 0)) - - # Create a progress bar using tqdm - progress_bar = tqdm(total=file_size, unit="B", unit_scale=True) - - # Open a file for writing in binary mode - downloaded_file_url = os.path.join(tempfile.gettempdir(), referred_name) - with open(downloaded_file_url, "wb") as file: - for chunk in response.iter_content(chunk_size=1024): - if chunk: - # Write the chunk to the file - file.write(chunk) - # Update the progress bar - progress_bar.update(len(chunk)) - - # Close the progress bar - progress_bar.close() - - except Exception as e: - err_msg = f"Error downloading file: {e}" - logger.error(err_msg) - return err_msg, 406 - elif local_path and os.path.exists(local_path): - referred_name = os.path.basename(local_path) - else: - return "Cannot find any valid project paths", 400 - - # prep local filepath - cleaned_path_name = os.path.splitext(referred_name)[0].replace(' ', '_') - job_dir = os.path.join(server.config['UPLOAD_FOLDER'], '-'.join( - [datetime.now().strftime("%Y.%m.%d_%H.%M.%S"), renderer, cleaned_path_name])) - os.makedirs(job_dir, exist_ok=True) - upload_dir = os.path.join(job_dir, 'source') - os.makedirs(upload_dir, exist_ok=True) - - # move projects to their work directories - loaded_project_local_path = None - if uploaded_project and uploaded_project.filename: - loaded_project_local_path = os.path.join(upload_dir, secure_filename(uploaded_project.filename)) - uploaded_project.save(loaded_project_local_path) - logger.info(f"Transfer complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}") - elif project_url: - loaded_project_local_path = os.path.join(upload_dir, referred_name) - shutil.move(downloaded_file_url, loaded_project_local_path) - logger.info(f"Download complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}") - elif local_path: - loaded_project_local_path = os.path.join(upload_dir, referred_name) - shutil.copy(local_path, loaded_project_local_path) - logger.info(f"Import complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}") - - # process uploaded zip files - zip_path = loaded_project_local_path if loaded_project_local_path.lower().endswith('.zip') else None - if zip_path: - zip_path = loaded_project_local_path - work_path = os.path.dirname(zip_path) - try: - with zipfile.ZipFile(zip_path, 'r') as myzip: - myzip.extractall(os.path.dirname(zip_path)) - - project_files = [x for x in os.listdir(work_path) if os.path.isfile(os.path.join(work_path, x))] - project_files = [x for x in project_files if '.zip' not in x] - supported_exts = RenderWorkerFactory.class_for_name(renderer).engine.supported_extensions - if supported_exts: - project_files = [file for file in project_files if - any(file.endswith(ext) for ext in supported_exts)] - - if len(project_files) != 1: # we have to narrow down to 1 main project file, otherwise error - return {'error': f'Cannot find valid project file in {os.path.basename(zip_path)}'}, 400 - - extracted_project_path = os.path.join(work_path, project_files[0]) - logger.info(f"Extracted zip file to {extracted_project_path}") - loaded_project_local_path = extracted_project_path - except (zipfile.BadZipFile, zipfile.LargeZipFile) as e: - err_msg = f"Error processing zip file: {e}" - logger.error(err_msg) - return err_msg, 500 - - # create and add jobs to render queue - results = [] - for job_data in jobs_list: - try: - # prepare output paths - output_dir = os.path.join(job_dir, job_data.get('name') if len(jobs_list) > 1 else 'output') - os.makedirs(system_safe_path(output_dir), exist_ok=True) - - # get new output path in output_dir - job_data['output_path'] = os.path.join(output_dir, os.path.basename( - job_data.get('output_path', None) or loaded_project_local_path - )) - - # create & configure jobs - worker = RenderWorkerFactory.create_worker(renderer=job_data['renderer'], - input_path=loaded_project_local_path, - output_path=job_data["output_path"], - engine_version=job_data.get('engine_version'), - args=job_data.get('args', {})) - worker.status = job_data.get("initial_status", worker.status) - worker.parent = job_data.get("parent", worker.parent) - worker.name = job_data.get("name", worker.name) - worker.priority = int(job_data.get('priority', worker.priority)) - worker.start_frame = int(job_data.get("start_frame", worker.start_frame)) - worker.end_frame = int(job_data.get("end_frame", worker.end_frame)) - - # determine if we can / should split the job - if server.config.get('enable_split_jobs', False) and (worker.total_frames > 1) and not worker.parent: - DistributedJobManager.split_into_subjobs(worker, job_data, zip_path or loaded_project_local_path) - - RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False)) - if not worker.parent: - make_job_ready(worker.id) - results.append(worker.json()) - except FileNotFoundError as e: - err_msg = f"Cannot create job: {e}" - logger.error(err_msg) - results.append({'error': err_msg}) - except Exception as e: - err_msg = f"Exception creating render job: {e}" - logger.exception(err_msg) - results.append({'error': err_msg}) - - # return any errors from results list + results = create_render_jobs(jobs_list, loaded_project_local_path, referred_name, + server.config['enable_split_jobs']) for response in results: if response.get('error', None): return results, 400 - - # redirect to index if requested if request.args.get('redirect', False): return redirect(url_for('index')) else: return results, 200 - except Exception as e: logger.exception(f"Unknown error adding job: {e}") return 'unknown error', 500 @@ -574,6 +436,7 @@ def renderer_info(): 'supported_export_formats': engine(install_path).get_output_formats()} return renderer_data + @server.get('/api/is_engine_available_to_download') def is_engine_available_to_download(): available_result = EngineManager.version_is_available_to_download(request.args.get('engine'),