#!/usr/bin/env python3 import concurrent.futures import json import logging import os import pathlib import shutil import socket import ssl import tempfile import time from datetime import datetime import psutil import yaml from flask import Flask, request, send_file, after_this_request, Response, redirect, url_for from sqlalchemy.orm.exc import DetachedInstanceError from src.api.add_job_helpers import handle_uploaded_project_files, process_zipped_project from src.api.preview_manager import PreviewManager from src.distributed_job_manager import DistributedJobManager from src.engines.engine_manager import EngineManager from src.render_queue import RenderQueue, JobNotFoundError from src.utilities.config import Config from src.utilities.misc_helper import system_safe_path, current_system_os, current_system_cpu, \ current_system_os_version, num_to_alphanumeric from src.utilities.status_utils import string_to_status from version import APP_VERSION logger = logging.getLogger() server = Flask(__name__) ssl._create_default_https_context = ssl._create_unverified_context # disable SSL for downloads API_VERSION = "1" def start_server(hostname=None): # get hostname if not hostname: local_hostname = socket.gethostname() hostname = local_hostname + (".local" if not local_hostname.endswith(".local") else "") # load flask settings server.config['HOSTNAME'] = hostname server.config['PORT'] = int(Config.port_number) server.config['UPLOAD_FOLDER'] = system_safe_path(os.path.expanduser(Config.upload_folder)) server.config['MAX_CONTENT_PATH'] = Config.max_content_path server.config['enable_split_jobs'] = Config.enable_split_jobs # disable most Flask logging flask_log = logging.getLogger('werkzeug') flask_log.setLevel(Config.flask_log_level.upper()) logger.debug('Starting API server') try: server.run(host=hostname, port=server.config['PORT'], debug=Config.flask_debug_enable, use_reloader=False, threaded=True) finally: logger.debug('Stopping API server') # -------------------------------------------- # Get All Jobs # -------------------------------------------- @server.get('/api/jobs') def jobs_json(): """Retrieves all jobs from the render queue in JSON format. This endpoint fetches all jobs currently in the render queue, converts them to JSON format, and returns them along with a cache token that represents the current state of the job list. Returns: dict: A dictionary containing: - 'jobs' (list[dict]): A list of job dictionaries, each representing a job in the queue. - 'token' (str): A cache token generated from the hash of the job list. """ all_jobs = [x.json() for x in RenderQueue.all_jobs()] job_cache_int = int(json.dumps(all_jobs).__hash__()) job_cache_token = num_to_alphanumeric(job_cache_int) return {'jobs': all_jobs, 'token': job_cache_token} @server.get('/api/jobs_long_poll') def long_polling_jobs(): hash_token = request.args.get('token', None) start_time = time.time() while True: all_jobs = jobs_json() if all_jobs['token'] != hash_token: return all_jobs # Break after 30 seconds to avoid gateway timeout if time.time() - start_time > 30: return {}, 204 time.sleep(1) @server.get('/api/jobs/') def filtered_jobs_json(status_val): state = string_to_status(status_val) jobs = [x.json() for x in RenderQueue.jobs_with_status(state)] if jobs: return jobs else: return f'Cannot find jobs with status {status_val}', 400 # -------------------------------------------- # Job Details / File Handling # -------------------------------------------- @server.get('/api/job/') def get_job_details(job_id): """Retrieves the details of a requested job in JSON format Args: job_id (str): The ID of the render job. Returns: dict: A JSON representation of the job's details. """ return RenderQueue.job_with_id(job_id).json() @server.get('/api/job//logs') def get_job_logs(job_id): """Retrieves the log file for a specific render job. Args: job_id (str): The ID of the render job. Returns: Response: The log file's content as plain text, or an empty response if the log file is not found. """ found_job = RenderQueue.job_with_id(job_id) log_path = system_safe_path(found_job.log_path()) log_data = None if log_path and os.path.exists(log_path): with open(log_path) as file: log_data = file.read() return Response(log_data, mimetype='text/plain') @server.get('/api/job//file_list') def get_file_list(job_id): return [os.path.basename(x) for x in RenderQueue.job_with_id(job_id).file_list()] @server.route('/api/job//download') def download_requested_file(job_id): requested_filename = request.args.get('filename') if not requested_filename: return 'Filename required', 400 found_job = RenderQueue.job_with_id(job_id) for job_filename in found_job.file_list(): if os.path.basename(job_filename).lower() == requested_filename.lower(): return send_file(job_filename, as_attachment=True, ) return f"File '{requested_filename}' not found", 404 @server.route('/api/job//download_all') def download_all_files(job_id): zip_filename = None @after_this_request def clear_zip(response): if zip_filename and os.path.exists(zip_filename): try: os.remove(zip_filename) except Exception as e: logger.warning(f"Error removing zip file '{zip_filename}': {e}") return response found_job = RenderQueue.job_with_id(job_id) output_dir = os.path.dirname(found_job.output_path) if os.path.exists(output_dir): from zipfile import ZipFile zip_filename = system_safe_path(os.path.join(tempfile.gettempdir(), pathlib.Path(found_job.input_path).stem + '.zip')) with ZipFile(zip_filename, 'w') as zipObj: for f in os.listdir(output_dir): zipObj.write(filename=system_safe_path(os.path.join(output_dir, f)), arcname=os.path.basename(f)) return send_file(zip_filename, mimetype="zip", as_attachment=True, ) else: return f'Cannot find project files for job {job_id}', 500 # -------------------------------------------- # System Environment / Status # -------------------------------------------- @server.get('/api/presets') def presets(): presets_path = system_safe_path('config/presets.yaml') with open(presets_path) as f: loaded_presets = yaml.load(f, Loader=yaml.FullLoader) return loaded_presets @server.get('/api/full_status') def full_status(): full_results = {'timestamp': datetime.now().isoformat(), 'servers': {}} try: snapshot_results = snapshot() server_data = {'status': snapshot_results.get('status', {}), 'jobs': snapshot_results.get('jobs', {}), 'is_online': True} full_results['servers'][server.config['HOSTNAME']] = server_data except Exception as e: logger.error(f"Exception fetching full status: {e}") return full_results @server.get('/api/snapshot') def snapshot(): server_status = status() server_jobs = [x.json() for x in RenderQueue.all_jobs()] server_data = {'status': server_status, 'jobs': server_jobs, 'timestamp': datetime.now().isoformat()} return server_data @server.route('/api/status') def status(): return {"timestamp": datetime.now().isoformat(), "system_os": current_system_os(), "system_os_version": current_system_os_version(), "system_cpu": current_system_cpu(), "cpu_percent": psutil.cpu_percent(percpu=False), "cpu_percent_per_cpu": psutil.cpu_percent(percpu=True), "cpu_count": psutil.cpu_count(logical=False), "memory_total": psutil.virtual_memory().total, "memory_available": psutil.virtual_memory().available, "memory_percent": psutil.virtual_memory().percent, "job_counts": RenderQueue.job_counts(), "hostname": server.config['HOSTNAME'], "port": server.config['PORT'], "app_version": APP_VERSION, "api_version": API_VERSION } # -------------------------------------------- # Job Lifecyle (Create, Cancel, Delete) # -------------------------------------------- @server.post('/api/add_job') def add_job_handler(): # Process request data try: if request.is_json: jobs_list = [request.json] if not isinstance(request.json, list) else request.json elif request.form.get('json', None): jobs_list = json.loads(request.form['json']) else: return "Invalid data", 400 except Exception as e: err_msg = f"Error processing job data: {e}" logger.error(err_msg) return err_msg, 500 try: loaded_project_local_path, referred_name = handle_uploaded_project_files(request, jobs_list, server.config['UPLOAD_FOLDER']) if loaded_project_local_path.lower().endswith('.zip'): loaded_project_local_path = process_zipped_project(loaded_project_local_path) results = [] for new_job_data in jobs_list: new_job = DistributedJobManager.create_render_job(new_job_data, loaded_project_local_path) results.append(new_job.json()) return results, 200 except Exception as e: logger.exception(f"Error adding job: {e}") return 'unknown error', 500 @server.get('/api/job//cancel') def cancel_job(job_id): if not request.args.get('confirm', False): return 'Confirmation required to cancel job', 400 if RenderQueue.cancel_job(RenderQueue.job_with_id(job_id)): if request.args.get('redirect', False): return redirect(url_for('index')) else: return "Job cancelled" else: return "Unknown error", 500 @server.route('/api/job//delete', methods=['POST', 'GET']) def delete_job(job_id): try: if not request.args.get('confirm', False): return 'Confirmation required to delete job', 400 # Check if we can remove the 'output' directory found_job = RenderQueue.job_with_id(job_id) project_dir = os.path.dirname(os.path.dirname(found_job.input_path)) output_dir = os.path.dirname(found_job.output_path) found_job.stop() try: PreviewManager.delete_previews_for_job(found_job) except Exception as e: logger.error(f"Error deleting previews for {found_job}: {e}") # finally delete the job RenderQueue.delete_job(found_job) # delete the output_dir if server.config['UPLOAD_FOLDER'] in output_dir and os.path.exists(output_dir): shutil.rmtree(output_dir) # See if we own the project_dir (i.e. was it uploaded) - if so delete the directory try: if server.config['UPLOAD_FOLDER'] in project_dir and os.path.exists(project_dir): # check to see if any other projects are sharing the same project file project_dir_files = [f for f in os.listdir(project_dir) if not f.startswith('.')] if len(project_dir_files) == 0 or (len(project_dir_files) == 1 and 'source' in project_dir_files[0]): logger.info(f"Removing project directory: {project_dir}") shutil.rmtree(project_dir) except Exception as e: logger.error(f"Error removing project files: {e}") return "Job deleted", 200 except Exception as e: logger.error(f"Error deleting job: {e}") return f"Error deleting job: {e}", 500 # -------------------------------------------- # Engine Info and Management: # -------------------------------------------- @server.get('/api/renderer_info') def renderer_info(): response_type = request.args.get('response_type', 'standard') if response_type not in ['full', 'standard']: raise ValueError(f"Invalid response_type: {response_type}") def process_engine(engine): try: # Get all installed versions of the engine installed_versions = EngineManager.all_versions_for_engine(engine.name()) if not installed_versions: return None system_installed_versions = [v for v in installed_versions if v['type'] == 'system'] install_path = system_installed_versions[0]['path'] if system_installed_versions else installed_versions[0]['path'] en = engine(install_path) engine_name = en.name() result = { engine_name: { 'is_available': RenderQueue.is_available_for_job(engine_name), 'versions': installed_versions } } if response_type == 'full': with concurrent.futures.ThreadPoolExecutor() as executor: future_results = { 'supported_extensions': executor.submit(en.supported_extensions), 'supported_export_formats': executor.submit(en.get_output_formats), 'system_info': executor.submit(en.system_info) } for key, future in future_results.items(): result[engine_name][key] = future.result() return result except Exception as e: logger.error(f'Error fetching details for {engine.name()} renderer: {e}') raise e renderer_data = {} with concurrent.futures.ThreadPoolExecutor() as executor: futures = {executor.submit(process_engine, engine): engine.name() for engine in EngineManager.supported_engines()} for future in concurrent.futures.as_completed(futures): result = future.result() if result: renderer_data.update(result) return renderer_data @server.get('/api//is_available') def is_engine_available(engine_name): return {'engine': engine_name, 'available': RenderQueue.is_available_for_job(engine_name), 'cpu_count': int(psutil.cpu_count(logical=False)), 'versions': EngineManager.all_versions_for_engine(engine_name), 'hostname': server.config['HOSTNAME']} @server.get('/api/is_engine_available_to_download') def is_engine_available_to_download(): available_result = EngineManager.version_is_available_to_download(request.args.get('engine'), request.args.get('version'), request.args.get('system_os'), request.args.get('cpu')) return available_result if available_result else \ (f"Cannot find available download for {request.args.get('engine')} {request.args.get('version')}", 500) @server.get('/api/find_most_recent_version') def find_most_recent_version(): most_recent = EngineManager.find_most_recent_version(request.args.get('engine'), request.args.get('system_os'), request.args.get('cpu')) return most_recent if most_recent else \ (f"Error finding most recent version of {request.args.get('engine')}", 500) @server.post('/api/download_engine') def download_engine(): download_result = EngineManager.download_engine(request.args.get('engine'), request.args.get('version'), request.args.get('system_os'), request.args.get('cpu')) return download_result if download_result else \ (f"Error downloading {request.args.get('engine')} {request.args.get('version')}", 500) @server.post('/api/delete_engine') def delete_engine_download(): json_data = request.json delete_result = EngineManager.delete_engine_download(json_data.get('engine'), json_data.get('version'), json_data.get('system_os'), json_data.get('cpu')) return "Success" if delete_result else \ (f"Error deleting {json_data.get('engine')} {json_data.get('version')}", 500) @server.get('/api/renderer//args') def get_renderer_args(renderer): try: renderer_engine_class = EngineManager.engine_with_name(renderer) return renderer_engine_class().get_arguments() except LookupError: return f"Cannot find renderer '{renderer}'", 400 @server.get('/api/renderer//help') def get_renderer_help(renderer): try: renderer_engine_class = EngineManager.engine_with_name(renderer) return renderer_engine_class().get_help() except LookupError: return f"Cannot find renderer '{renderer}'", 400 # -------------------------------------------- # Miscellaneous: # -------------------------------------------- @server.post('/api/job//send_subjob_update_notification') def subjob_update_notification(job_id): subjob_details = request.json DistributedJobManager.handle_subjob_update_notification(RenderQueue.job_with_id(job_id), subjob_data=subjob_details) return Response(status=200) @server.route('/api/job//thumbnail') def job_thumbnail(job_id): try: big_thumb = request.args.get('size', False) == "big" video_ok = request.args.get('video_ok', False) found_job = RenderQueue.job_with_id(job_id, none_ok=False) # trigger a thumbnail update - just in case PreviewManager.update_previews_for_job(found_job, wait_until_completion=True, timeout=60) previews = PreviewManager.get_previews_for_job(found_job) all_previews_list = previews.get('output', previews.get('input', [])) video_previews = [x for x in all_previews_list if x['kind'] == 'video'] image_previews = [x for x in all_previews_list if x['kind'] == 'image'] filtered_list = video_previews if video_previews and video_ok else image_previews # todo - sort by size or other metrics here if filtered_list: preview_to_send = filtered_list[0] mime_types = {'image': 'image/jpeg', 'video': 'video/mp4'} file_mime_type = mime_types.get(preview_to_send['kind'], 'unknown') return send_file(preview_to_send['filename'], mimetype=file_mime_type) except Exception as e: logger.error(f'Error getting thumbnail: {e}') return f'Error getting thumbnail: {e}', 500 return "No thumbnail available", 404 # -------------------------------------------- # System Benchmarks: # -------------------------------------------- @server.get('/api/cpu_benchmark') def get_cpu_benchmark_score(): from src.utilities.benchmark import cpu_benchmark return str(cpu_benchmark(10)) @server.get('/api/disk_benchmark') def get_disk_benchmark(): from src.utilities.benchmark import disk_io_benchmark results = disk_io_benchmark() return {'write_speed': results[0], 'read_speed': results[-1]} # -------------------------------------------- # Error Handlers: # -------------------------------------------- @server.errorhandler(JobNotFoundError) def handle_job_not_found(job_error): return str(job_error), 400 @server.errorhandler(DetachedInstanceError) def handle_detached_instance(_): return "Unavailable", 503 @server.errorhandler(Exception) def handle_general_error(general_error): err_msg = f"Server error: {general_error}" logger.error(err_msg) return err_msg, 500 # -------------------------------------------- # Debug / Development Only: # -------------------------------------------- @server.get('/api/_debug/detected_clients') def detected_clients(): # todo: dev/debug only. Should not ship this - probably. from src.utilities.zeroconf_server import ZeroconfServer return ZeroconfServer.found_hostnames() @server.get('/api/_debug/clear_history') def clear_history(): RenderQueue.clear_history() return 'success'