From 86a1dae5b65c268bc3f4033067b68dee935f6ec5 Mon Sep 17 00:00:00 2001 From: Brett Williams Date: Sat, 10 Jun 2023 22:55:08 -0500 Subject: [PATCH] Submit URLs in new jobs - Also add_job cleanup. (#12) * Add ability to submit render jobs as URLs * Considated add_job and add_job_handler * Cleaned up code flow and added back in support for local file path submissions * Misc cleanup --- lib/server/job_server.py | 234 +++++++++++++++++---------------------- 1 file changed, 104 insertions(+), 130 deletions(-) diff --git a/lib/server/job_server.py b/lib/server/job_server.py index 14124ad..e2614ee 100755 --- a/lib/server/job_server.py +++ b/lib/server/job_server.py @@ -15,6 +15,8 @@ import json2html import requests import yaml from flask import Flask, request, render_template, send_file, after_this_request, Response, redirect, url_for, abort +from urllib.parse import urlparse +from urllib.request import urlretrieve from werkzeug.utils import secure_filename from lib.server.zeroconf_server import ZeroconfServer @@ -269,8 +271,9 @@ def snapshot(): @server.post('/api/add_job') def add_job_handler(): + + # initial handling of raw data try: - """Create new job and add to server render queue""" if request.is_json: jobs_list = [request.json] elif request.form.get('json', None): @@ -288,157 +291,128 @@ def add_job_handler(): args['raw'] = form_dict.get('raw_args', None) form_dict['args'] = args jobs_list = [form_dict] + except Exception as e: + err_msg = f"Error processing job data: {e}" + logger.error(err_msg) + return err_msg, 500 + # start handling project files + try: # handle uploaded files - uploaded_file = request.files.get('file', None) - uploaded_file_local_path = None - job_dir = None - if uploaded_file and uploaded_file.filename: - logger.info(f"Receiving uploaded file {uploaded_file.filename}") - job_dir = os.path.join(server.config['UPLOAD_FOLDER'], '_'.join( - [datetime.now().strftime("%Y.%m.%d_%H.%M.%S"), - jobs_list[0]['renderer'], - os.path.splitext(uploaded_file.filename)[0]])) - os.makedirs(job_dir, exist_ok=True) + uploaded_project = request.files.get('file', None) + project_url = jobs_list[0].get('url', None) + input_path = jobs_list[0].get('input_path', None) + renderer = jobs_list[0].get('renderer') - upload_dir = os.path.join(job_dir, 'source') - os.makedirs(upload_dir, exist_ok=True) - uploaded_file_local_path = os.path.join(upload_dir, secure_filename(uploaded_file.filename)) - uploaded_file.save(uploaded_file_local_path) + downloaded_file_url = None + if uploaded_project and uploaded_project.filename: + referred_name = os.path.basename(uploaded_project.filename) + elif project_url: + # download and save url - have to download first to know filename due to redirects + logger.info(f"Attempting to download URL: {project_url}") + try: + downloaded_file_url, info = urlretrieve(project_url) + referred_name = info.get_filename() + except Exception as e: + err_msg = f"Error downloading file: {e}" + logger.error(err_msg) + return err_msg, 406 + elif input_path and os.path.exists(input_path): + referred_name = os.path.basename(input_path) + else: + return "Cannot find any valid project paths", 400 - # convert job input paths for uploaded files and add jobs + # prep local filepath + job_dir = os.path.join(server.config['UPLOAD_FOLDER'], '_'.join( + [datetime.now().strftime("%Y.%m.%d_%H.%M.%S"), renderer, os.path.splitext(referred_name)[0]])) + os.makedirs(job_dir, exist_ok=True) + upload_dir = os.path.join(job_dir, 'source') + os.makedirs(upload_dir, exist_ok=True) + + # move projects to their work directories + loaded_project_local_path = None + if uploaded_project and uploaded_project.filename: + loaded_project_local_path = os.path.join(upload_dir, secure_filename(uploaded_project.filename)) + uploaded_project.save(loaded_project_local_path) + logger.info(f"Transfer complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}") + elif project_url: + loaded_project_local_path = os.path.join(upload_dir, referred_name) + shutil.move(downloaded_file_url, loaded_project_local_path) + logger.info(f"Download complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}") + elif input_path: + loaded_project_local_path = os.path.join(upload_dir, referred_name) + shutil.copy(input_path, loaded_project_local_path) + logger.info(f"Import complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}") + + # process uploaded zip files + if loaded_project_local_path.lower().endswith('.zip'): + zip_path = loaded_project_local_path + work_path = os.path.dirname(zip_path) + try: + with zipfile.ZipFile(zip_path, 'r') as myzip: + myzip.extractall(os.path.dirname(zip_path)) + + project_files = [x for x in os.listdir(work_path) if os.path.isfile(os.path.join(work_path, x))] + project_files = [x for x in project_files if '.zip' not in x] + supported_exts = RenderWorkerFactory.class_for_name(renderer).engine.supported_extensions + if supported_exts: + project_files = [file for file in project_files if + any(file.endswith(ext) for ext in supported_exts)] + + if len(project_files) != 1: # we have to narrow down to 1 main project file, otherwise error + return {'error': f'Cannot find valid project file in {os.path.basename(zip_path)}'}, 400 + + extracted_project_path = os.path.join(work_path, project_files[0]) + logger.info(f"Extracted zip file to {extracted_project_path}") + loaded_project_local_path = extracted_project_path + except (zipfile.BadZipFile, zipfile.LargeZipFile) as e: + err_msg = f"Error processing zip file: {e}" + logger.error(err_msg) + return err_msg, 500 + + # create and add jobs to render queue results = [] for job in jobs_list: - if uploaded_file_local_path: - job['input_path'] = uploaded_file_local_path + try: + # prepare output paths output_dir = os.path.join(job_dir, job.get('name', None) or 'output') os.makedirs(output_dir, exist_ok=True) - job['output_path'] = os.path.join(output_dir, os.path.basename(job.get('name', None) or job['output_path'])) - remove_job_dir = len(jobs_list) == 1 and uploaded_file_local_path # remove failed job dir for single file uploads only - add_result = add_job(job, remove_job_dir_on_failure=remove_job_dir) - results.append(add_result) + job['output_path'] = os.path.join(output_dir, os.path.basename(job.get('name', None) or + job['output_path'])) + + # create & configure jobs + render_job = RenderWorkerFactory.create_worker(renderer=job['renderer'], + input_path=loaded_project_local_path, + output_path=job["output_path"], + args=job.get('args', {})) + render_job.client = job.get('client', None) or RenderQueue.hostname + render_job.owner = job.get("owner", None) + render_job.name = job.get("name", None) + render_job.priority = int(job.get('priority', render_job.priority)) + + RenderQueue.add_to_render_queue(render_job, force_start=job.get('force_start', False)) + results.append(render_job.json()) + except Exception as e: + err_msg = f"Error creating render job: {e}" + logger.error(err_msg) + results.append({'error': err_msg}) # return any errors from results list for response in results: if response.get('error', None): - if len(results) == 1: - return results, response.get('code', 500) - else: - return results, 400 + return results, 400 + # redirect to index if requested if request.args.get('redirect', False): return redirect(url_for('index')) else: - return results + return results, 200 except Exception as e: logger.exception(f"Unknown error adding job: {e}") return 'unknown error', 500 -def add_job(job_params, remove_job_dir_on_failure=False): - def remove_job_dir(): - if remove_job_dir_on_failure and job_dir and os.path.exists(job_dir): - logger.debug(f"Removing job dir: {job_dir}") - shutil.rmtree(job_dir) - - name = job_params.get("name", None) - job_owner = job_params.get("owner", None) - renderer = job_params.get("renderer", None) - input_path = job_params.get("input_path", None) - output_path = job_params.get("output_path", None) - priority = int(job_params.get('priority', 2)) - args = job_params.get('args', {}) - client = job_params.get('client', None) or RenderQueue.hostname - force_start = job_params.get('force_start', False) - job_dir = None # todo: I dont think this gets set anywhere - - # check for minimum render requirements - if None in [renderer, input_path, output_path]: - err_msg = 'Cannot add job: Missing required parameters' - logger.error(err_msg) - return {'error': err_msg, 'code': 400} - - # process uploaded zip files - if '.zip' in input_path: - zip_path = input_path - work_path = os.path.dirname(zip_path) - try: - with zipfile.ZipFile(zip_path, 'r') as myzip: - myzip.extractall(os.path.dirname(zip_path)) - - project_files = [x for x in os.listdir(work_path) if os.path.isfile(os.path.join(work_path, x))] - project_files = [x for x in project_files if '.zip' not in x] - supported_exts = RenderWorkerFactory.class_for_name(renderer).engine.supported_extensions - if supported_exts: - project_files = [file for file in project_files if any(file.endswith(ext) for ext in supported_exts)] - - if len(project_files) != 1: # we have to narrow down to 1 main project file, otherwise error - return {'error': f'Cannot find valid project file in {os.path.basename(zip_path)}'}, 400 - - extracted_project_path = os.path.join(work_path, project_files[0]) - logger.info(f"Extracted zip file to {extracted_project_path}") - input_path = extracted_project_path - except (zipfile.BadZipFile, zipfile.LargeZipFile) as e: - err_msg = f"Error processing zip file: {e}" - logger.error(err_msg) - return {'error': err_msg, 'code': 400} - finally: - os.remove(zip_path) - logger.info(f"Removed zip: {zip_path}") - - # local renders - if client == RenderQueue.hostname: - logger.info(f"Creating job locally - {name if name else input_path}") - try: - render_job = RenderWorkerFactory.create_worker(renderer=renderer, input_path=input_path, - output_path=output_path, args=args) - render_job.client = client - render_job.owner = job_owner - render_job.name = name - render_job.priority = priority - - RenderQueue.add_to_render_queue(render_job, force_start=force_start) - return render_job.json() - except Exception as e: - err_msg = f"Error creating job: {str(e)}" - logger.exception(err_msg) - remove_job_dir() - return {'error': err_msg, 'code': 400} - - # client renders - elif client in RenderQueue.render_clients(): - if client.is_available(): - # call uploader on remote client - try: - logger.info(f"Uploading file {input_path} to client {client}") - job_data = request.json - response = RenderServerProxy(hostname=client.hostname).post_job_to_server(input_path, job_data) - if response.ok: - logger.info("Job submitted successfully!") - return response.json() if response.json() else "Job ok" - else: - remove_job_dir() - return {'error': "Job rejected by client", 'code': 400} - except requests.ConnectionError as e: - err_msg = f"Error submitting job to client: {client}" - logger.error(err_msg) - remove_job_dir() - return {'error': err_msg, 'code': 500} - else: - # client is not available - err_msg = f"Render client '{client}' is unreachable" - logger.error(err_msg) - remove_job_dir() - return {'error': err_msg, 'code': 503} - - else: - err_msg = f"Unknown render client: '{client}'" - logger.error(err_msg) - remove_job_dir() - return {'error': err_msg, 'code': 400} - - @server.get('/api/job//cancel') def cancel_job(job_id): if not request.args.get('confirm', False):