Split add_job_handler out to all_job_helpers.py

This commit is contained in:
Brett Williams
2023-10-24 20:53:06 -05:00
parent 782a1a4699
commit 03e7b95e1b
2 changed files with 185 additions and 147 deletions

175
src/api/add_job_helpers.py Normal file
View File

@@ -0,0 +1,175 @@
#!/usr/bin/env python3
import logging
import os
import shutil
import tempfile
import zipfile
from datetime import datetime
import requests
from tqdm import tqdm
from werkzeug.utils import secure_filename
from src.distributed_job_manager import DistributedJobManager
from src.engines.core.worker_factory import RenderWorkerFactory
from src.render_queue import RenderQueue
logger = logging.getLogger()
def handle_uploaded_project_files(request, jobs_list, upload_directory):
# Initialize default values
loaded_project_local_path = None
uploaded_project = request.files.get('file', None)
project_url = jobs_list[0].get('url', None)
local_path = jobs_list[0].get('local_path', None)
renderer = jobs_list[0].get('renderer')
downloaded_file_url = None
if uploaded_project and uploaded_project.filename:
referred_name = os.path.basename(uploaded_project.filename)
elif project_url:
referred_name, downloaded_file_url = download_project_from_url(project_url)
if not referred_name:
raise ValueError(f"Error downloading file from URL: {project_url}")
elif local_path and os.path.exists(local_path):
referred_name = os.path.basename(local_path)
else:
raise ValueError("Cannot find any valid project paths")
# Prepare the local filepath
cleaned_path_name = os.path.splitext(referred_name)[0].replace(' ', '_')
job_dir = os.path.join(upload_directory, '-'.join(
[datetime.now().strftime("%Y.%m.%d_%H.%M.%S"), renderer, cleaned_path_name]))
os.makedirs(job_dir, exist_ok=True)
upload_dir = os.path.join(job_dir, 'source')
os.makedirs(upload_dir, exist_ok=True)
# Move projects to their work directories
if uploaded_project and uploaded_project.filename:
loaded_project_local_path = os.path.join(upload_dir, secure_filename(uploaded_project.filename))
uploaded_project.save(loaded_project_local_path)
logger.info(f"Transfer complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}")
elif project_url:
loaded_project_local_path = os.path.join(upload_dir, referred_name)
shutil.move(downloaded_file_url, loaded_project_local_path)
logger.info(f"Download complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}")
elif local_path:
loaded_project_local_path = os.path.join(upload_dir, referred_name)
shutil.copy(local_path, loaded_project_local_path)
logger.info(f"Import complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}")
return loaded_project_local_path, referred_name
def download_project_from_url(project_url):
# This nested function is to handle downloading from a URL
logger.info(f"Downloading project from url: {project_url}")
referred_name = os.path.basename(project_url)
downloaded_file_url = None
try:
response = requests.get(project_url, stream=True)
if response.status_code == 200:
# Get the total file size from the "Content-Length" header
file_size = int(response.headers.get("Content-Length", 0))
# Create a progress bar using tqdm
progress_bar = tqdm(total=file_size, unit="B", unit_scale=True)
# Open a file for writing in binary mode
downloaded_file_url = os.path.join(tempfile.gettempdir(), referred_name)
with open(downloaded_file_url, "wb") as file:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
# Write the chunk to the file
file.write(chunk)
# Update the progress bar
progress_bar.update(len(chunk))
# Close the progress bar
progress_bar.close()
else:
return None, None
except Exception as e:
logger.error(f"Error downloading file: {e}")
return None, None
return referred_name, downloaded_file_url
def process_zipped_project(zip_path):
# Given a zip path, extract its content, and return the main project file path
work_path = os.path.dirname(zip_path)
try:
with zipfile.ZipFile(zip_path, 'r') as myzip:
myzip.extractall(work_path)
project_files = [x for x in os.listdir(work_path) if os.path.isfile(os.path.join(work_path, x))]
project_files = [x for x in project_files if '.zip' not in x]
logger.debug(f"Zip files: {project_files}")
# supported_exts = RenderWorkerFactory.class_for_name(renderer).engine.supported_extensions
# if supported_exts:
# project_files = [file for file in project_files if any(file.endswith(ext) for ext in supported_exts)]
# If there's more than 1 project file or none, raise an error
if len(project_files) != 1:
raise ValueError(f'Cannot find a valid project file in {os.path.basename(zip_path)}')
extracted_project_path = os.path.join(work_path, project_files[0])
logger.info(f"Extracted zip file to {extracted_project_path}")
except (zipfile.BadZipFile, zipfile.LargeZipFile) as e:
logger.error(f"Error processing zip file: {e}")
raise ValueError(f"Error processing zip file: {e}")
return extracted_project_path
def create_render_jobs(jobs_list, loaded_project_local_path, job_dir, enable_split_jobs=False):
results = []
for job_data in jobs_list:
try:
# prepare output paths
output_dir = os.path.join(job_dir, job_data.get('name') if len(jobs_list) > 1 else 'output')
os.makedirs(output_dir, exist_ok=True)
# get new output path in output_dir
job_data['output_path'] = os.path.join(output_dir, os.path.basename(
job_data.get('output_path', None) or loaded_project_local_path
))
# create & configure jobs
worker = RenderWorkerFactory.create_worker(renderer=job_data['renderer'],
input_path=loaded_project_local_path,
output_path=job_data["output_path"],
engine_version=job_data.get('engine_version'),
args=job_data.get('args', {}))
worker.status = job_data.get("initial_status", worker.status)
worker.parent = job_data.get("parent", worker.parent)
worker.name = job_data.get("name", worker.name)
worker.priority = int(job_data.get('priority', worker.priority))
worker.start_frame = int(job_data.get("start_frame", worker.start_frame))
worker.end_frame = int(job_data.get("end_frame", worker.end_frame))
# determine if we can / should split the job
if enable_split_jobs and (worker.total_frames > 1) and not worker.parent:
DistributedJobManager.split_into_subjobs(worker, job_data, loaded_project_local_path)
RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False))
if not worker.parent:
from src.api.api_server import make_job_ready
make_job_ready(worker.id)
results.append(worker.json())
except FileNotFoundError as e:
err_msg = f"Cannot create job: {e}"
logger.error(err_msg)
results.append({'error': err_msg})
except Exception as e:
err_msg = f"Exception creating render job: {e}"
logger.exception(err_msg)
results.append({'error': err_msg})
return results

View File

@@ -10,19 +10,16 @@ import ssl
import tempfile import tempfile
import threading import threading
import time import time
import zipfile
from datetime import datetime from datetime import datetime
from zipfile import ZipFile from zipfile import ZipFile
import json2html import json2html
import psutil import psutil
import requests
import yaml import yaml
from flask import Flask, request, render_template, send_file, after_this_request, Response, redirect, url_for, abort from flask import Flask, request, render_template, send_file, after_this_request, Response, redirect, url_for, abort
from tqdm import tqdm
from werkzeug.utils import secure_filename
from src.api.server_proxy import RenderServerProxy from src.api.server_proxy import RenderServerProxy
from src.api.add_job_helpers import handle_uploaded_project_files, process_zipped_project, create_render_jobs
from src.distributed_job_manager import DistributedJobManager from src.distributed_job_manager import DistributedJobManager
from src.engines.core.base_worker import string_to_status, RenderStatus from src.engines.core.base_worker import string_to_status, RenderStatus
from src.engines.core.worker_factory import RenderWorkerFactory from src.engines.core.worker_factory import RenderWorkerFactory
@@ -283,9 +280,10 @@ def detected_clients():
return ZeroconfServer.found_clients() return ZeroconfServer.found_clients()
# New version
@server.post('/api/add_job') @server.post('/api/add_job')
def add_job_handler(): def add_job_handler():
# initial handling of raw data # Process request data
try: try:
if request.is_json: if request.is_json:
jobs_list = [request.json] if not isinstance(request.json, list) else request.json jobs_list = [request.json] if not isinstance(request.json, list) else request.json
@@ -309,157 +307,21 @@ def add_job_handler():
logger.error(err_msg) logger.error(err_msg)
return err_msg, 500 return err_msg, 500
# start handling project files
try: try:
# handle uploaded files loaded_project_local_path, referred_name = handle_uploaded_project_files(request, jobs_list,
logger.debug(f"Incoming new job request: {jobs_list}") server.config['UPLOAD_FOLDER'])
uploaded_project = request.files.get('file', None) if loaded_project_local_path.lower().endswith('.zip'):
project_url = jobs_list[0].get('url', None) loaded_project_local_path = process_zipped_project(loaded_project_local_path)
local_path = jobs_list[0].get('local_path', None)
renderer = jobs_list[0].get('renderer')
downloaded_file_url = None results = create_render_jobs(jobs_list, loaded_project_local_path, referred_name,
if uploaded_project and uploaded_project.filename: server.config['enable_split_jobs'])
referred_name = os.path.basename(uploaded_project.filename)
elif project_url:
# download and save url - have to download first to know filename due to redirects
logger.info(f"Downloading project from url: {project_url}")
try:
referred_name = os.path.basename(project_url)
response = requests.get(project_url, stream=True)
if response.status_code == 200:
# Get the total file size from the "Content-Length" header
file_size = int(response.headers.get("Content-Length", 0))
# Create a progress bar using tqdm
progress_bar = tqdm(total=file_size, unit="B", unit_scale=True)
# Open a file for writing in binary mode
downloaded_file_url = os.path.join(tempfile.gettempdir(), referred_name)
with open(downloaded_file_url, "wb") as file:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
# Write the chunk to the file
file.write(chunk)
# Update the progress bar
progress_bar.update(len(chunk))
# Close the progress bar
progress_bar.close()
except Exception as e:
err_msg = f"Error downloading file: {e}"
logger.error(err_msg)
return err_msg, 406
elif local_path and os.path.exists(local_path):
referred_name = os.path.basename(local_path)
else:
return "Cannot find any valid project paths", 400
# prep local filepath
cleaned_path_name = os.path.splitext(referred_name)[0].replace(' ', '_')
job_dir = os.path.join(server.config['UPLOAD_FOLDER'], '-'.join(
[datetime.now().strftime("%Y.%m.%d_%H.%M.%S"), renderer, cleaned_path_name]))
os.makedirs(job_dir, exist_ok=True)
upload_dir = os.path.join(job_dir, 'source')
os.makedirs(upload_dir, exist_ok=True)
# move projects to their work directories
loaded_project_local_path = None
if uploaded_project and uploaded_project.filename:
loaded_project_local_path = os.path.join(upload_dir, secure_filename(uploaded_project.filename))
uploaded_project.save(loaded_project_local_path)
logger.info(f"Transfer complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}")
elif project_url:
loaded_project_local_path = os.path.join(upload_dir, referred_name)
shutil.move(downloaded_file_url, loaded_project_local_path)
logger.info(f"Download complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}")
elif local_path:
loaded_project_local_path = os.path.join(upload_dir, referred_name)
shutil.copy(local_path, loaded_project_local_path)
logger.info(f"Import complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}")
# process uploaded zip files
zip_path = loaded_project_local_path if loaded_project_local_path.lower().endswith('.zip') else None
if zip_path:
zip_path = loaded_project_local_path
work_path = os.path.dirname(zip_path)
try:
with zipfile.ZipFile(zip_path, 'r') as myzip:
myzip.extractall(os.path.dirname(zip_path))
project_files = [x for x in os.listdir(work_path) if os.path.isfile(os.path.join(work_path, x))]
project_files = [x for x in project_files if '.zip' not in x]
supported_exts = RenderWorkerFactory.class_for_name(renderer).engine.supported_extensions
if supported_exts:
project_files = [file for file in project_files if
any(file.endswith(ext) for ext in supported_exts)]
if len(project_files) != 1: # we have to narrow down to 1 main project file, otherwise error
return {'error': f'Cannot find valid project file in {os.path.basename(zip_path)}'}, 400
extracted_project_path = os.path.join(work_path, project_files[0])
logger.info(f"Extracted zip file to {extracted_project_path}")
loaded_project_local_path = extracted_project_path
except (zipfile.BadZipFile, zipfile.LargeZipFile) as e:
err_msg = f"Error processing zip file: {e}"
logger.error(err_msg)
return err_msg, 500
# create and add jobs to render queue
results = []
for job_data in jobs_list:
try:
# prepare output paths
output_dir = os.path.join(job_dir, job_data.get('name') if len(jobs_list) > 1 else 'output')
os.makedirs(system_safe_path(output_dir), exist_ok=True)
# get new output path in output_dir
job_data['output_path'] = os.path.join(output_dir, os.path.basename(
job_data.get('output_path', None) or loaded_project_local_path
))
# create & configure jobs
worker = RenderWorkerFactory.create_worker(renderer=job_data['renderer'],
input_path=loaded_project_local_path,
output_path=job_data["output_path"],
engine_version=job_data.get('engine_version'),
args=job_data.get('args', {}))
worker.status = job_data.get("initial_status", worker.status)
worker.parent = job_data.get("parent", worker.parent)
worker.name = job_data.get("name", worker.name)
worker.priority = int(job_data.get('priority', worker.priority))
worker.start_frame = int(job_data.get("start_frame", worker.start_frame))
worker.end_frame = int(job_data.get("end_frame", worker.end_frame))
# determine if we can / should split the job
if server.config.get('enable_split_jobs', False) and (worker.total_frames > 1) and not worker.parent:
DistributedJobManager.split_into_subjobs(worker, job_data, zip_path or loaded_project_local_path)
RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False))
if not worker.parent:
make_job_ready(worker.id)
results.append(worker.json())
except FileNotFoundError as e:
err_msg = f"Cannot create job: {e}"
logger.error(err_msg)
results.append({'error': err_msg})
except Exception as e:
err_msg = f"Exception creating render job: {e}"
logger.exception(err_msg)
results.append({'error': err_msg})
# return any errors from results list
for response in results: for response in results:
if response.get('error', None): if response.get('error', None):
return results, 400 return results, 400
# redirect to index if requested
if request.args.get('redirect', False): if request.args.get('redirect', False):
return redirect(url_for('index')) return redirect(url_for('index'))
else: else:
return results, 200 return results, 200
except Exception as e: except Exception as e:
logger.exception(f"Unknown error adding job: {e}") logger.exception(f"Unknown error adding job: {e}")
return 'unknown error', 500 return 'unknown error', 500
@@ -574,6 +436,7 @@ def renderer_info():
'supported_export_formats': engine(install_path).get_output_formats()} 'supported_export_formats': engine(install_path).get_output_formats()}
return renderer_data return renderer_data
@server.get('/api/is_engine_available_to_download') @server.get('/api/is_engine_available_to_download')
def is_engine_available_to_download(): def is_engine_available_to_download():
available_result = EngineManager.version_is_available_to_download(request.args.get('engine'), available_result = EngineManager.version_is_available_to_download(request.args.get('engine'),