From 5534f0b1b20893c46d3446e0a15ad765ab5c7743 Mon Sep 17 00:00:00 2001 From: Brett Williams Date: Wed, 21 Aug 2024 00:35:19 -0500 Subject: [PATCH] Reorganize api_server.py --- src/api/api_server.py | 344 ++++++++++++++++++++++-------------------- 1 file changed, 178 insertions(+), 166 deletions(-) diff --git a/src/api/api_server.py b/src/api/api_server.py index b353520..664c09d 100755 --- a/src/api/api_server.py +++ b/src/api/api_server.py @@ -10,143 +10,84 @@ import ssl import tempfile import time from datetime import datetime -from zipfile import ZipFile import psutil import yaml -from flask import Flask, request, send_file, after_this_request, Response, redirect, url_for, abort +from flask import Flask, request, send_file, after_this_request, Response, redirect, url_for from sqlalchemy.orm.exc import DetachedInstanceError from src.api.add_job_helpers import handle_uploaded_project_files, process_zipped_project from src.api.preview_manager import PreviewManager from src.distributed_job_manager import DistributedJobManager -from src.engines.core.base_worker import string_to_status, RenderStatus from src.engines.engine_manager import EngineManager from src.render_queue import RenderQueue, JobNotFoundError -from src.utilities.benchmark import cpu_benchmark, disk_io_benchmark from src.utilities.config import Config from src.utilities.misc_helper import system_safe_path, current_system_os, current_system_cpu, \ current_system_os_version, num_to_alphanumeric -from src.utilities.zeroconf_server import ZeroconfServer +from src.utilities.status_utils import string_to_status logger = logging.getLogger() server = Flask(__name__) ssl._create_default_https_context = ssl._create_unverified_context # disable SSL for downloads -categories = [RenderStatus.RUNNING, RenderStatus.ERROR, RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED, - RenderStatus.COMPLETED, RenderStatus.CANCELLED] + +def start_server(hostname=None): + + # get hostname + if not hostname: + local_hostname = socket.gethostname() + hostname = local_hostname + (".local" if not local_hostname.endswith(".local") else "") + + # load flask settings + server.config['HOSTNAME'] = hostname + server.config['PORT'] = int(Config.port_number) + server.config['UPLOAD_FOLDER'] = system_safe_path(os.path.expanduser(Config.upload_folder)) + server.config['MAX_CONTENT_PATH'] = Config.max_content_path + server.config['enable_split_jobs'] = Config.enable_split_jobs + + # disable most Flask logging + flask_log = logging.getLogger('werkzeug') + flask_log.setLevel(Config.flask_log_level.upper()) + + logger.debug('Starting API server') + server.run(host='0.0.0.0', port=server.config['PORT'], debug=Config.flask_debug_enable, use_reloader=False, + threaded=True) -# -- Error Handlers -- - -@server.errorhandler(JobNotFoundError) -def handle_job_not_found(job_error): - return str(job_error), 400 - - -@server.errorhandler(DetachedInstanceError) -def handle_detached_instance(error): - # logger.debug(f"detached instance: {error}") - return "Unavailable", 503 - - -@server.errorhandler(Exception) -def handle_general_error(general_error): - err_msg = f"Server error: {general_error}" - logger.error(err_msg) - return err_msg, 500 - - -# -- Jobs -- - - -def sorted_jobs(all_jobs, sort_by_date=True): - if not sort_by_date: - sorted_job_list = [] - if all_jobs: - for status_category in categories: - found_jobs = [x for x in all_jobs if x.status == status_category.value] - if found_jobs: - sorted_found_jobs = sorted(found_jobs, key=lambda d: d.date_created, reverse=True) - sorted_job_list.extend(sorted_found_jobs) - else: - sorted_job_list = sorted(all_jobs, key=lambda d: d.date_created, reverse=True) - return sorted_job_list - +# -------------------------------------------- +# Get All Jobs +# -------------------------------------------- @server.get('/api/jobs') def jobs_json(): - try: - all_jobs = [x.json() for x in RenderQueue.all_jobs()] - job_cache_int = int(json.dumps(all_jobs).__hash__()) - job_cache_token = num_to_alphanumeric(job_cache_int) - return {'jobs': all_jobs, 'token': job_cache_token} - except DetachedInstanceError as e: - raise e - except Exception as e: - logger.error(f"Error fetching jobs_json: {e}") - raise e + """Retrieves all jobs from the render queue in JSON format. + + This endpoint fetches all jobs currently in the render queue, converts them to JSON format, + and returns them along with a cache token that represents the current state of the job list. + + Returns: + dict: A dictionary containing: + - 'jobs' (list[dict]): A list of job dictionaries, each representing a job in the queue. + - 'token' (str): A cache token generated from the hash of the job list. + """ + all_jobs = [x.json() for x in RenderQueue.all_jobs()] + job_cache_int = int(json.dumps(all_jobs).__hash__()) + job_cache_token = num_to_alphanumeric(job_cache_int) + return {'jobs': all_jobs, 'token': job_cache_token} @server.get('/api/jobs_long_poll') def long_polling_jobs(): - try: - hash_token = request.args.get('token', None) - start_time = time.time() - while True: - all_jobs = jobs_json() - if all_jobs['token'] != hash_token: - return all_jobs - # Break after 30 seconds to avoid gateway timeout - if time.time() - start_time > 30: - return {}, 204 - time.sleep(1) - except DetachedInstanceError as e: - raise e - except Exception as e: - logger.error(f"Error fetching long_polling_jobs: {e}") - raise e - - -@server.route('/api/job//thumbnail') -def job_thumbnail(job_id): - - try: - big_thumb = request.args.get('size', False) == "big" - video_ok = request.args.get('video_ok', False) - found_job = RenderQueue.job_with_id(job_id, none_ok=False) - - # trigger a thumbnail update - just in case - PreviewManager.update_previews_for_job(found_job, wait_until_completion=True, timeout=60) - previews = PreviewManager.get_previews_for_job(found_job) - all_previews_list = previews.get('output', previews.get('input', [])) - - video_previews = [x for x in all_previews_list if x['kind'] == 'video'] - image_previews = [x for x in all_previews_list if x['kind'] == 'image'] - filtered_list = video_previews if video_previews and video_ok else image_previews - - # todo - sort by size or other metrics here - if filtered_list: - preview_to_send = filtered_list[0] - mime_types = {'image': 'image/jpeg', 'video': 'video/mp4'} - file_mime_type = mime_types.get(preview_to_send['kind'], 'unknown') - return send_file(preview_to_send['filename'], mimetype=file_mime_type) - except Exception as e: - logger.error(f'Error getting thumbnail: {e}') - return f'Error getting thumbnail: {e}', 500 - return "No thumbnail available", 404 - - -# Get job file routing -@server.route('/api/job//file/', methods=['GET']) -def get_job_file(job_id, filename): - found_job = RenderQueue.job_with_id(job_id) - try: - for full_path in found_job.file_list(): - if filename in full_path: - return send_file(path_or_file=full_path) - except FileNotFoundError: - abort(404) + hash_token = request.args.get('token', None) + start_time = time.time() + while True: + all_jobs = jobs_json() + if all_jobs['token'] != hash_token: + return all_jobs + # Break after 30 seconds to avoid gateway timeout + if time.time() - start_time > 30: + return {}, 204 + time.sleep(1) @server.get('/api/jobs/') @@ -159,20 +100,33 @@ def filtered_jobs_json(status_val): return f'Cannot find jobs with status {status_val}', 400 -@server.post('/api/job//send_subjob_update_notification') -def subjob_update_notification(job_id): - subjob_details = request.json - DistributedJobManager.handle_subjob_update_notification(RenderQueue.job_with_id(job_id), subjob_data=subjob_details) - return Response(status=200) - +# -------------------------------------------- +# Job Details / File Handling +# -------------------------------------------- @server.get('/api/job/') -def get_job_status(job_id): +def get_job_details(job_id): + """Retrieves the details of a requested job in JSON format + + Args: + job_id (str): The ID of the render job. + + Returns: + dict: A JSON representation of the job's details. + """ return RenderQueue.job_with_id(job_id).json() @server.get('/api/job//logs') def get_job_logs(job_id): + """Retrieves the log file for a specific render job. + + Args: + job_id (str): The ID of the render job. + + Returns: + Response: The log file's content as plain text, or an empty response if the log file is not found. + """ found_job = RenderQueue.job_with_id(job_id) log_path = system_safe_path(found_job.log_path()) log_data = None @@ -188,7 +142,7 @@ def get_file_list(job_id): @server.route('/api/job//download') -def download_file(job_id): +def download_requested_file(job_id): requested_filename = request.args.get('filename') if not requested_filename: @@ -203,7 +157,7 @@ def download_file(job_id): @server.route('/api/job//download_all') -def download_all(job_id): +def download_all_files(job_id): zip_filename = None @after_this_request @@ -218,6 +172,7 @@ def download_all(job_id): found_job = RenderQueue.job_with_id(job_id) output_dir = os.path.dirname(found_job.output_path) if os.path.exists(output_dir): + from zipfile import ZipFile zip_filename = system_safe_path(os.path.join(tempfile.gettempdir(), pathlib.Path(found_job.input_path).stem + '.zip')) with ZipFile(zip_filename, 'w') as zipObj: @@ -229,6 +184,10 @@ def download_all(job_id): return f'Cannot find project files for job {job_id}', 500 +# -------------------------------------------- +# System Environment / Status +# -------------------------------------------- + @server.get('/api/presets') def presets(): presets_path = system_safe_path('config/presets.yaml') @@ -260,13 +219,28 @@ def snapshot(): return server_data -@server.get('/api/_detected_clients') -def detected_clients(): - # todo: dev/debug only. Should not ship this - probably. - return ZeroconfServer.found_hostnames() +@server.route('/api/status') +def status(): + return {"timestamp": datetime.now().isoformat(), + "system_os": current_system_os(), + "system_os_version": current_system_os_version(), + "system_cpu": current_system_cpu(), + "cpu_percent": psutil.cpu_percent(percpu=False), + "cpu_percent_per_cpu": psutil.cpu_percent(percpu=True), + "cpu_count": psutil.cpu_count(logical=False), + "memory_total": psutil.virtual_memory().total, + "memory_available": psutil.virtual_memory().available, + "memory_percent": psutil.virtual_memory().percent, + "job_counts": RenderQueue.job_counts(), + "hostname": server.config['HOSTNAME'], + "port": server.config['PORT'] + } -# New version +# -------------------------------------------- +# Job Lifecyle (Create, Cancel, Delete) +# -------------------------------------------- + @server.post('/api/add_job') def add_job_handler(): # Process request data @@ -353,31 +327,9 @@ def delete_job(job_id): return f"Error deleting job: {e}", 500 -@server.get('/api/clear_history') -def clear_history(): - RenderQueue.clear_history() - return 'success' - - -@server.route('/api/status') -def status(): - - # Get system info - return {"timestamp": datetime.now().isoformat(), - "system_os": current_system_os(), - "system_os_version": current_system_os_version(), - "system_cpu": current_system_cpu(), - "cpu_percent": psutil.cpu_percent(percpu=False), - "cpu_percent_per_cpu": psutil.cpu_percent(percpu=True), - "cpu_count": psutil.cpu_count(logical=False), - "memory_total": psutil.virtual_memory().total, - "memory_available": psutil.virtual_memory().available, - "memory_percent": psutil.virtual_memory().percent, - "job_counts": RenderQueue.job_counts(), - "hostname": server.config['HOSTNAME'], - "port": server.config['PORT'] - } - +# -------------------------------------------- +# Engine Info and Management: +# -------------------------------------------- @server.get('/api/renderer_info') def renderer_info(): @@ -499,35 +451,95 @@ def get_renderer_help(renderer): return f"Cannot find renderer '{renderer}'", 400 +# -------------------------------------------- +# Miscellaneous: +# -------------------------------------------- +@server.post('/api/job//send_subjob_update_notification') +def subjob_update_notification(job_id): + subjob_details = request.json + DistributedJobManager.handle_subjob_update_notification(RenderQueue.job_with_id(job_id), subjob_data=subjob_details) + return Response(status=200) + + +@server.route('/api/job//thumbnail') +def job_thumbnail(job_id): + + try: + big_thumb = request.args.get('size', False) == "big" + video_ok = request.args.get('video_ok', False) + found_job = RenderQueue.job_with_id(job_id, none_ok=False) + + # trigger a thumbnail update - just in case + PreviewManager.update_previews_for_job(found_job, wait_until_completion=True, timeout=60) + previews = PreviewManager.get_previews_for_job(found_job) + all_previews_list = previews.get('output', previews.get('input', [])) + + video_previews = [x for x in all_previews_list if x['kind'] == 'video'] + image_previews = [x for x in all_previews_list if x['kind'] == 'image'] + filtered_list = video_previews if video_previews and video_ok else image_previews + + # todo - sort by size or other metrics here + if filtered_list: + preview_to_send = filtered_list[0] + mime_types = {'image': 'image/jpeg', 'video': 'video/mp4'} + file_mime_type = mime_types.get(preview_to_send['kind'], 'unknown') + return send_file(preview_to_send['filename'], mimetype=file_mime_type) + except Exception as e: + logger.error(f'Error getting thumbnail: {e}') + return f'Error getting thumbnail: {e}', 500 + return "No thumbnail available", 404 + + +# -------------------------------------------- +# System Benchmarks: +# -------------------------------------------- + @server.get('/api/cpu_benchmark') def get_cpu_benchmark_score(): + from src.utilities.benchmark import cpu_benchmark return str(cpu_benchmark(10)) @server.get('/api/disk_benchmark') def get_disk_benchmark(): + from src.utilities.benchmark import disk_io_benchmark results = disk_io_benchmark() return {'write_speed': results[0], 'read_speed': results[-1]} -def start_server(hostname=None): +# -------------------------------------------- +# Error Handlers: +# -------------------------------------------- - # get hostname - if not hostname: - local_hostname = socket.gethostname() - hostname = local_hostname + (".local" if not local_hostname.endswith(".local") else "") +@server.errorhandler(JobNotFoundError) +def handle_job_not_found(job_error): + return str(job_error), 400 - # load flask settings - server.config['HOSTNAME'] = hostname - server.config['PORT'] = int(Config.port_number) - server.config['UPLOAD_FOLDER'] = system_safe_path(os.path.expanduser(Config.upload_folder)) - server.config['MAX_CONTENT_PATH'] = Config.max_content_path - server.config['enable_split_jobs'] = Config.enable_split_jobs - # disable most Flask logging - flask_log = logging.getLogger('werkzeug') - flask_log.setLevel(Config.flask_log_level.upper()) +@server.errorhandler(DetachedInstanceError) +def handle_detached_instance(_): + return "Unavailable", 503 - logger.debug('Starting API server') - server.run(host='0.0.0.0', port=server.config['PORT'], debug=Config.flask_debug_enable, use_reloader=False, - threaded=True) + +@server.errorhandler(Exception) +def handle_general_error(general_error): + err_msg = f"Server error: {general_error}" + logger.error(err_msg) + return err_msg, 500 + + +# -------------------------------------------- +# Debug / Development Only: +# -------------------------------------------- + +@server.get('/api/_debug/detected_clients') +def detected_clients(): + # todo: dev/debug only. Should not ship this - probably. + from src.utilities.zeroconf_server import ZeroconfServer + return ZeroconfServer.found_hostnames() + + +@server.get('/api/_debug/clear_history') +def clear_history(): + RenderQueue.clear_history() + return 'success'