Submit URLs in new jobs - Also add_job cleanup. (#12)

* Add ability to submit render jobs as URLs

* Considated add_job and add_job_handler

* Cleaned up code flow and added back in support for local file path submissions

* Misc cleanup
This commit is contained in:
2023-06-10 22:55:08 -05:00
committed by GitHub
parent 2763a0c97f
commit 86a1dae5b6

View File

@@ -15,6 +15,8 @@ import json2html
import requests import requests
import yaml import yaml
from flask import Flask, request, render_template, send_file, after_this_request, Response, redirect, url_for, abort from flask import Flask, request, render_template, send_file, after_this_request, Response, redirect, url_for, abort
from urllib.parse import urlparse
from urllib.request import urlretrieve
from werkzeug.utils import secure_filename from werkzeug.utils import secure_filename
from lib.server.zeroconf_server import ZeroconfServer from lib.server.zeroconf_server import ZeroconfServer
@@ -269,8 +271,9 @@ def snapshot():
@server.post('/api/add_job') @server.post('/api/add_job')
def add_job_handler(): def add_job_handler():
# initial handling of raw data
try: try:
"""Create new job and add to server render queue"""
if request.is_json: if request.is_json:
jobs_list = [request.json] jobs_list = [request.json]
elif request.form.get('json', None): elif request.form.get('json', None):
@@ -288,157 +291,128 @@ def add_job_handler():
args['raw'] = form_dict.get('raw_args', None) args['raw'] = form_dict.get('raw_args', None)
form_dict['args'] = args form_dict['args'] = args
jobs_list = [form_dict] jobs_list = [form_dict]
except Exception as e:
err_msg = f"Error processing job data: {e}"
logger.error(err_msg)
return err_msg, 500
# start handling project files
try:
# handle uploaded files # handle uploaded files
uploaded_file = request.files.get('file', None) uploaded_project = request.files.get('file', None)
uploaded_file_local_path = None project_url = jobs_list[0].get('url', None)
job_dir = None input_path = jobs_list[0].get('input_path', None)
if uploaded_file and uploaded_file.filename: renderer = jobs_list[0].get('renderer')
logger.info(f"Receiving uploaded file {uploaded_file.filename}")
job_dir = os.path.join(server.config['UPLOAD_FOLDER'], '_'.join(
[datetime.now().strftime("%Y.%m.%d_%H.%M.%S"),
jobs_list[0]['renderer'],
os.path.splitext(uploaded_file.filename)[0]]))
os.makedirs(job_dir, exist_ok=True)
upload_dir = os.path.join(job_dir, 'source') downloaded_file_url = None
os.makedirs(upload_dir, exist_ok=True) if uploaded_project and uploaded_project.filename:
uploaded_file_local_path = os.path.join(upload_dir, secure_filename(uploaded_file.filename)) referred_name = os.path.basename(uploaded_project.filename)
uploaded_file.save(uploaded_file_local_path) elif project_url:
# download and save url - have to download first to know filename due to redirects
logger.info(f"Attempting to download URL: {project_url}")
try:
downloaded_file_url, info = urlretrieve(project_url)
referred_name = info.get_filename()
except Exception as e:
err_msg = f"Error downloading file: {e}"
logger.error(err_msg)
return err_msg, 406
elif input_path and os.path.exists(input_path):
referred_name = os.path.basename(input_path)
else:
return "Cannot find any valid project paths", 400
# convert job input paths for uploaded files and add jobs # prep local filepath
job_dir = os.path.join(server.config['UPLOAD_FOLDER'], '_'.join(
[datetime.now().strftime("%Y.%m.%d_%H.%M.%S"), renderer, os.path.splitext(referred_name)[0]]))
os.makedirs(job_dir, exist_ok=True)
upload_dir = os.path.join(job_dir, 'source')
os.makedirs(upload_dir, exist_ok=True)
# move projects to their work directories
loaded_project_local_path = None
if uploaded_project and uploaded_project.filename:
loaded_project_local_path = os.path.join(upload_dir, secure_filename(uploaded_project.filename))
uploaded_project.save(loaded_project_local_path)
logger.info(f"Transfer complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}")
elif project_url:
loaded_project_local_path = os.path.join(upload_dir, referred_name)
shutil.move(downloaded_file_url, loaded_project_local_path)
logger.info(f"Download complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}")
elif input_path:
loaded_project_local_path = os.path.join(upload_dir, referred_name)
shutil.copy(input_path, loaded_project_local_path)
logger.info(f"Import complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}")
# process uploaded zip files
if loaded_project_local_path.lower().endswith('.zip'):
zip_path = loaded_project_local_path
work_path = os.path.dirname(zip_path)
try:
with zipfile.ZipFile(zip_path, 'r') as myzip:
myzip.extractall(os.path.dirname(zip_path))
project_files = [x for x in os.listdir(work_path) if os.path.isfile(os.path.join(work_path, x))]
project_files = [x for x in project_files if '.zip' not in x]
supported_exts = RenderWorkerFactory.class_for_name(renderer).engine.supported_extensions
if supported_exts:
project_files = [file for file in project_files if
any(file.endswith(ext) for ext in supported_exts)]
if len(project_files) != 1: # we have to narrow down to 1 main project file, otherwise error
return {'error': f'Cannot find valid project file in {os.path.basename(zip_path)}'}, 400
extracted_project_path = os.path.join(work_path, project_files[0])
logger.info(f"Extracted zip file to {extracted_project_path}")
loaded_project_local_path = extracted_project_path
except (zipfile.BadZipFile, zipfile.LargeZipFile) as e:
err_msg = f"Error processing zip file: {e}"
logger.error(err_msg)
return err_msg, 500
# create and add jobs to render queue
results = [] results = []
for job in jobs_list: for job in jobs_list:
if uploaded_file_local_path: try:
job['input_path'] = uploaded_file_local_path # prepare output paths
output_dir = os.path.join(job_dir, job.get('name', None) or 'output') output_dir = os.path.join(job_dir, job.get('name', None) or 'output')
os.makedirs(output_dir, exist_ok=True) os.makedirs(output_dir, exist_ok=True)
job['output_path'] = os.path.join(output_dir, os.path.basename(job.get('name', None) or job['output_path'])) job['output_path'] = os.path.join(output_dir, os.path.basename(job.get('name', None) or
remove_job_dir = len(jobs_list) == 1 and uploaded_file_local_path # remove failed job dir for single file uploads only job['output_path']))
add_result = add_job(job, remove_job_dir_on_failure=remove_job_dir)
results.append(add_result) # create & configure jobs
render_job = RenderWorkerFactory.create_worker(renderer=job['renderer'],
input_path=loaded_project_local_path,
output_path=job["output_path"],
args=job.get('args', {}))
render_job.client = job.get('client', None) or RenderQueue.hostname
render_job.owner = job.get("owner", None)
render_job.name = job.get("name", None)
render_job.priority = int(job.get('priority', render_job.priority))
RenderQueue.add_to_render_queue(render_job, force_start=job.get('force_start', False))
results.append(render_job.json())
except Exception as e:
err_msg = f"Error creating render job: {e}"
logger.error(err_msg)
results.append({'error': err_msg})
# return any errors from results list # return any errors from results list
for response in results: for response in results:
if response.get('error', None): if response.get('error', None):
if len(results) == 1: return results, 400
return results, response.get('code', 500)
else:
return results, 400
# redirect to index if requested
if request.args.get('redirect', False): if request.args.get('redirect', False):
return redirect(url_for('index')) return redirect(url_for('index'))
else: else:
return results return results, 200
except Exception as e: except Exception as e:
logger.exception(f"Unknown error adding job: {e}") logger.exception(f"Unknown error adding job: {e}")
return 'unknown error', 500 return 'unknown error', 500
def add_job(job_params, remove_job_dir_on_failure=False):
def remove_job_dir():
if remove_job_dir_on_failure and job_dir and os.path.exists(job_dir):
logger.debug(f"Removing job dir: {job_dir}")
shutil.rmtree(job_dir)
name = job_params.get("name", None)
job_owner = job_params.get("owner", None)
renderer = job_params.get("renderer", None)
input_path = job_params.get("input_path", None)
output_path = job_params.get("output_path", None)
priority = int(job_params.get('priority', 2))
args = job_params.get('args', {})
client = job_params.get('client', None) or RenderQueue.hostname
force_start = job_params.get('force_start', False)
job_dir = None # todo: I dont think this gets set anywhere
# check for minimum render requirements
if None in [renderer, input_path, output_path]:
err_msg = 'Cannot add job: Missing required parameters'
logger.error(err_msg)
return {'error': err_msg, 'code': 400}
# process uploaded zip files
if '.zip' in input_path:
zip_path = input_path
work_path = os.path.dirname(zip_path)
try:
with zipfile.ZipFile(zip_path, 'r') as myzip:
myzip.extractall(os.path.dirname(zip_path))
project_files = [x for x in os.listdir(work_path) if os.path.isfile(os.path.join(work_path, x))]
project_files = [x for x in project_files if '.zip' not in x]
supported_exts = RenderWorkerFactory.class_for_name(renderer).engine.supported_extensions
if supported_exts:
project_files = [file for file in project_files if any(file.endswith(ext) for ext in supported_exts)]
if len(project_files) != 1: # we have to narrow down to 1 main project file, otherwise error
return {'error': f'Cannot find valid project file in {os.path.basename(zip_path)}'}, 400
extracted_project_path = os.path.join(work_path, project_files[0])
logger.info(f"Extracted zip file to {extracted_project_path}")
input_path = extracted_project_path
except (zipfile.BadZipFile, zipfile.LargeZipFile) as e:
err_msg = f"Error processing zip file: {e}"
logger.error(err_msg)
return {'error': err_msg, 'code': 400}
finally:
os.remove(zip_path)
logger.info(f"Removed zip: {zip_path}")
# local renders
if client == RenderQueue.hostname:
logger.info(f"Creating job locally - {name if name else input_path}")
try:
render_job = RenderWorkerFactory.create_worker(renderer=renderer, input_path=input_path,
output_path=output_path, args=args)
render_job.client = client
render_job.owner = job_owner
render_job.name = name
render_job.priority = priority
RenderQueue.add_to_render_queue(render_job, force_start=force_start)
return render_job.json()
except Exception as e:
err_msg = f"Error creating job: {str(e)}"
logger.exception(err_msg)
remove_job_dir()
return {'error': err_msg, 'code': 400}
# client renders
elif client in RenderQueue.render_clients():
if client.is_available():
# call uploader on remote client
try:
logger.info(f"Uploading file {input_path} to client {client}")
job_data = request.json
response = RenderServerProxy(hostname=client.hostname).post_job_to_server(input_path, job_data)
if response.ok:
logger.info("Job submitted successfully!")
return response.json() if response.json() else "Job ok"
else:
remove_job_dir()
return {'error': "Job rejected by client", 'code': 400}
except requests.ConnectionError as e:
err_msg = f"Error submitting job to client: {client}"
logger.error(err_msg)
remove_job_dir()
return {'error': err_msg, 'code': 500}
else:
# client is not available
err_msg = f"Render client '{client}' is unreachable"
logger.error(err_msg)
remove_job_dir()
return {'error': err_msg, 'code': 503}
else:
err_msg = f"Unknown render client: '{client}'"
logger.error(err_msg)
remove_job_dir()
return {'error': err_msg, 'code': 400}
@server.get('/api/job/<job_id>/cancel') @server.get('/api/job/<job_id>/cancel')
def cancel_job(job_id): def cancel_job(job_id):
if not request.args.get('confirm', False): if not request.args.get('confirm', False):