From 3ee3dfd4e2b0f63328a9851e6b125ee97677d6ec Mon Sep 17 00:00:00 2001 From: Brett Williams Date: Tue, 25 Oct 2022 11:34:35 -0700 Subject: [PATCH] Rename RenderManager to RenderQueue --- lib/{render_manager.py => render_queue.py} | 20 ++--- server.py | 92 +++++++++++++--------- 2 files changed, 66 insertions(+), 46 deletions(-) rename lib/{render_manager.py => render_queue.py} (94%) diff --git a/lib/render_manager.py b/lib/render_queue.py similarity index 94% rename from lib/render_manager.py rename to lib/render_queue.py index efe7dd7..b96a025 100755 --- a/lib/render_manager.py +++ b/lib/render_queue.py @@ -16,8 +16,8 @@ JSON_FILE = 'server_state.json' #todo: move history to sqlite db -class RenderManager: - render_queue = [] +class RenderQueue: + job_queue = [] render_clients = [] maximum_renderer_instances = {'Blender': 2, 'After Effects': 1, 'ffmpeg': 4} host_name = None @@ -36,7 +36,7 @@ class RenderManager: if not client or render_job.client == cls.host_name: logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job.render)) render_job.client = cls.host_name - cls.render_queue.append(render_job) + cls.job_queue.append(render_job) if force_start: cls.start_job(render_job) else: @@ -57,7 +57,7 @@ class RenderManager: @classmethod def jobs_with_status(cls, status, priority_sorted=False, include_archived=True): - found_jobs = [x for x in cls.render_queue if x.render_status() == status] + found_jobs = [x for x in cls.job_queue if x.render_status() == status] if not include_archived: found_jobs = [x for x in found_jobs if not x.archived] if priority_sorted: @@ -66,13 +66,13 @@ class RenderManager: @classmethod def job_with_id(cls, job_id): - found_job = next((x for x in cls.render_queue if x.id == job_id), None) + found_job = next((x for x in cls.job_queue if x.id == job_id), None) return found_job @classmethod def clear_history(cls): - to_remove = [x for x in cls.render_queue if x.render_status() in [RenderStatus.CANCELLED, - RenderStatus.COMPLETED, RenderStatus.ERROR]] + to_remove = [x for x in cls.job_queue if x.render_status() in [RenderStatus.CANCELLED, + RenderStatus.COMPLETED, RenderStatus.ERROR]] for x in to_remove: x.archived = True cls.save_state() @@ -118,7 +118,7 @@ class RenderManager: new_job.render.status = RenderStatus.CANCELLED # finally add back to render queue - cls.render_queue.append(new_job) + cls.job_queue.append(new_job) cls.last_saved_counts = cls.job_counts() @@ -128,7 +128,7 @@ class RenderManager: try: logger.debug("Saving Render History") output = {'timestamp': datetime.now().isoformat(), - 'jobs': [json.loads(j.json()) for j in cls.render_queue], + 'jobs': [json.loads(j.json()) for j in cls.job_queue], 'clients': cls.render_clients} output_path = json_path or JSON_FILE with open(output_path, 'w') as f: @@ -209,7 +209,7 @@ class RenderManager: @classmethod def all_jobs(cls): - all_jobs = [x for x in cls.render_queue if not x.archived] + all_jobs = [x for x in cls.job_queue if not x.archived] return all_jobs @classmethod diff --git a/server.py b/server.py index c86bdc6..04855de 100755 --- a/server.py +++ b/server.py @@ -3,6 +3,7 @@ import json import logging import os +import shutil import socket import threading import time @@ -14,7 +15,7 @@ from flask import Flask, request, render_template from werkzeug.utils import secure_filename from lib.render_job import RenderJob -from lib.render_manager import RenderManager +from lib.render_queue import RenderQueue from utilities.render_worker import RenderWorkerFactory, string_to_status logger = logging.getLogger() @@ -23,13 +24,13 @@ app = Flask(__name__) @app.get('/jobs') def jobs_json(): - return [json.loads(x.json()) for x in RenderManager.render_queue if not x.archived] + return [json.loads(x.json()) for x in RenderQueue.job_queue if not x.archived] @app.get('/jobs/') def filtered_jobs_json(status_val): state = string_to_status(status_val) - jobs = [json.loads(x.json()) for x in RenderManager.jobs_with_status(state)] + jobs = [json.loads(x.json()) for x in RenderQueue.jobs_with_status(state)] if jobs: return jobs else: @@ -38,7 +39,7 @@ def filtered_jobs_json(status_val): @app.get('/job_status/') def get_job_status(job_id): - found_job = RenderManager.job_with_id(job_id) + found_job = RenderQueue.job_with_id(job_id) if found_job: logger.info("Founbd jobs") return found_job.json() @@ -49,20 +50,20 @@ def get_job_status(job_id): @app.post('/register_client') def register_client(): client_hostname = request.values['hostname'] - x = RenderManager.register_client(client_hostname) + x = RenderQueue.register_client(client_hostname) return "Success" if x else "Fail" @app.post('/unregister_client') def unregister_client(): client_hostname = request.values['hostname'] - x = RenderManager.unregister_client(client_hostname) + x = RenderQueue.unregister_client(client_hostname) return "Success" if x else "Fail" @app.get('/clients') def render_clients(): - return RenderManager.render_clients + return RenderQueue.render_clients @app.get('/full_status') @@ -70,9 +71,9 @@ def full_status(): full_results = {'timestamp': datetime.now().isoformat(), 'servers': {}} try: - for client_hostname in RenderManager.render_clients: + for client_hostname in RenderQueue.render_clients: is_online = False - if client_hostname == RenderManager.host_name: + if client_hostname == RenderQueue.host_name: snapshot_results = snapshot() is_online = True else: @@ -94,8 +95,8 @@ def full_status(): @app.get('/snapshot') def snapshot(): - server_status = RenderManager.status() - server_jobs = [json.loads(x.json()) for x in RenderManager.render_queue if not x.archived] + server_status = RenderQueue.status() + server_jobs = [json.loads(x.json()) for x in RenderQueue.job_queue if not x.archived] server_data = {'status': server_status, 'jobs': server_jobs, 'timestamp': datetime.now().isoformat()} return server_data @@ -103,6 +104,11 @@ def snapshot(): @app.post('/add_job') def add_job(): + def remove_job_dir(): + if job_dir and os.path.exists(job_dir): + logger.debug(f"Removing job dir: {job_dir}") + shutil.rmtree(job_dir) + try: """Create new job and add to server render queue""" json_string = request.form.get('json', None) @@ -110,28 +116,25 @@ def add_job(): return 'missing json data', 400 request_params = json.loads(json_string) + job_owner = request_params.get("owner", None) renderer = request_params.get("renderer", None) input_path = request_params.get("input", None) output_path = request_params.get("output", None) priority = int(request_params.get('priority', 2)) args = request_params.get('args', {}) - client = request_params.get('client', RenderManager.host_name) + client = request_params.get('client', RenderQueue.host_name) force_start = request_params.get('force_start', False) uploaded_file = request.files.get('file', None) html_origin = request_params.get('origin', None) == 'html' custom_id = None + job_dir = None + # check for minimum render requirements if None in [renderer, input_path or (uploaded_file and uploaded_file.filename), output_path]: err_msg = 'Cannot add job: Missing required parameters' logger.error(err_msg) return err_msg, 400 - # # cleanup args from html form and convert them into an args dict - # for key, val in request_params.items(): - # if key.startswith(renderer): - # cleaned_key = key.split('+')[-1] - # args[cleaned_key] = val - # handle uploaded files if uploaded_file and uploaded_file.filename: logger.info(f"Receiving uploaded file {uploaded_file.filename}") @@ -146,25 +149,26 @@ def add_job(): output_path = os.path.join(job_dir, os.path.basename(output_path)) # local renders - if client == RenderManager.host_name: + if client == RenderQueue.host_name: logger.info(f"Creating job locally - {input_path}") try: render_job = RenderWorkerFactory.create_worker(renderer, input_path, output_path, args) except ValueError as e: err_msg = f"Error creating job: {str(e)}" logger.exception(err_msg) + remove_job_dir() return err_msg, 400 - new_job = RenderJob(render_job, priority=priority, custom_id=custom_id) - RenderManager.add_to_render_queue(new_job, force_start=force_start) + new_job = RenderJob(render_job, priority=priority, owner=job_owner, custom_id=custom_id) + RenderQueue.add_to_render_queue(new_job, force_start=force_start) return new_job.json() # client renders - elif client in RenderManager.render_clients: + elif client in RenderQueue.render_clients: # see if host is available - if RenderManager.is_client_available(client): + if RenderQueue.is_client_available(client): # call uploader on remote client try: @@ -175,24 +179,29 @@ def add_job(): logger.info("Job submitted successfully!") return response.json() if response.json() else "Job ok" else: + remove_job_dir() return 'Job rejected by client', 403 except requests.ConnectionError as e: err_msg = f"Error submitting job to client: {client}" logger.error(err_msg) + remove_job_dir() return err_msg, 500 else: # client is not available err_msg = f"Render client '{client}' is unreachable" logger.error(err_msg) + remove_job_dir() return err_msg, 503 else: err_msg = f"Unknown render client: '{client}'" logger.error(err_msg) + remove_job_dir() return err_msg, 400 except Exception as e: logger.exception(f"Unknown error adding job: {e}") + remove_job_dir() return 'unknown error', 500 @@ -205,25 +214,36 @@ def cancel_job(): elif not confirm: return 'confirmation required', 400 else: - found = [x for x in RenderManager.render_queue if x.id == job_id] + found = [x for x in RenderQueue.job_queue if x.id == job_id] if len(found) > 1: # logger.error('Multiple jobs found for ID {}'.format(job_id)) return f'multiple jobs found for ID {job_id}', 400 elif found: - success = RenderManager.cancel_job(found[0]) + success = RenderQueue.cancel_job(found[0]) return success return 'job not found', 400 @app.get('/clear_history') def clear_history(): - RenderManager.clear_history() + RenderQueue.clear_history() return 'success' @app.route('/status') def status(): - return RenderManager.status() + return RenderQueue.status() + + +@app.get('/renderer_info') +def renderer_info(): + renderer_data = {} + for r in RenderWorkerFactory.supported_renderers(): + renderer_class = RenderWorkerFactory.class_for_name(r) + renderer_data[r] = {'version': renderer_class.version(), + 'supported_extensions': renderer_class.supported_extensions, + 'supported_export_formats': renderer_class.supported_export_formats} + return renderer_data @app.route('/') @@ -233,7 +253,7 @@ def default(): @app.route('/upload') def upload_file_page(): - return render_template('upload.html', render_clients=RenderManager.render_clients, + return render_template('upload.html', render_clients=RenderQueue.render_clients, supported_renderers=RenderWorkerFactory.supported_renderers()) @@ -250,7 +270,7 @@ def start_server(background_thread=False): def eval_loop(delay_sec=1): while True: - RenderManager.evaluate_queue() + RenderQueue.evaluate_queue() time.sleep(delay_sec) with open('config.yaml') as f: @@ -264,28 +284,28 @@ def start_server(background_thread=False): # app.config['RESULT_STATIC_PATH'] = 'static' # Get hostname and render clients - RenderManager.host_name = socket.gethostname() - app.config['HOSTNAME'] = RenderManager.host_name - if not RenderManager.render_clients: - RenderManager.render_clients = [RenderManager.host_name] + RenderQueue.host_name = socket.gethostname() + app.config['HOSTNAME'] = RenderQueue.host_name + if not RenderQueue.render_clients: + RenderQueue.render_clients = [RenderQueue.host_name] # disable most Flask logging flask_log = logging.getLogger('werkzeug') flask_log.setLevel(config.get('flask_log_level', 'ERROR').upper()) # Setup the RenderManager object - RenderManager.load_state() + RenderQueue.load_state() thread = threading.Thread(target=eval_loop, kwargs={'delay_sec': config.get('queue_eval_seconds', 1)}, daemon=True) thread.start() if background_thread: server_thread = threading.Thread( - target=lambda: app.run(host='0.0.0.0', port=RenderManager.port, debug=False, use_reloader=False)) + target=lambda: app.run(host='0.0.0.0', port=RenderQueue.port, debug=False, use_reloader=False)) server_thread.start() server_thread.join() else: - app.run(host='0.0.0.0', port=RenderManager.port, debug=config.get('flask_debug_enable', False), + app.run(host='0.0.0.0', port=RenderQueue.port, debug=config.get('flask_debug_enable', False), use_reloader=False)