#!/usr/bin/env python3 import concurrent.futures import json import logging import shutil import socket import ssl import tempfile import time import traceback from datetime import datetime from pathlib import Path from typing import Dict, Any, Optional import cpuinfo import psutil import yaml from flask import Flask, request, send_file, after_this_request, Response, redirect, url_for from sqlalchemy.orm.exc import DetachedInstanceError from werkzeug.exceptions import HTTPException from src.api.job_import_handler import JobImportHandler from src.api.preview_manager import PreviewManager from src.distributed_job_manager import DistributedJobManager from src.engines.engine_manager import EngineManager from src.render_queue import RenderQueue, JobNotFoundError from src.utilities.config import Config from src.utilities.misc_helper import current_system_os, current_system_cpu, \ current_system_os_version, num_to_alphanumeric, get_gpu_info from src.utilities.status_utils import string_to_status from src.version import APP_VERSION logger = logging.getLogger() server = Flask(__name__) ssl._create_default_https_context = ssl._create_unverified_context # disable SSL for downloads API_VERSION = "0.1" def start_api_server(hostname: Optional[str] = None) -> None: # get hostname if not hostname: local_hostname = socket.gethostname() hostname = local_hostname + (".local" if not local_hostname.endswith(".local") else "") # load flask settings server.config['HOSTNAME'] = hostname server.config['PORT'] = int(Config.port_number) server.config['UPLOAD_FOLDER'] = str(Path(Config.upload_folder).expanduser()) server.config['MAX_CONTENT_PATH'] = Config.max_content_path server.config['enable_split_jobs'] = Config.enable_split_jobs # disable most Flask logging flask_log = logging.getLogger('werkzeug') flask_log.setLevel(Config.flask_log_level.upper()) logger.debug('Starting API server') try: server.run(host=hostname, port=server.config['PORT'], debug=Config.flask_debug_enable, use_reloader=False, threaded=True) finally: logger.debug('Stopping API server') # -------------------------------------------- # Get All Jobs # -------------------------------------------- @server.get('/api/jobs') def jobs_json() -> Dict[str, Any]: """Retrieves all jobs from the render queue in JSON format. This endpoint fetches all jobs currently in the render queue, converts them to JSON format, and returns them along with a cache token that represents the current state of the job list. Returns: dict: A dictionary containing: - 'jobs' (list[dict]): A list of job dictionaries, each representing a job in the queue. - 'token' (str): A cache token generated from the hash of the job list. """ all_jobs = [x.json() for x in RenderQueue.all_jobs()] job_cache_int = int(json.dumps(all_jobs).__hash__()) job_cache_token = num_to_alphanumeric(job_cache_int) return {'jobs': all_jobs, 'token': job_cache_token} @server.get('/api/jobs_long_poll') def long_polling_jobs(): hash_token = request.args.get('token', None) start_time = time.time() while True: all_jobs = jobs_json() if all_jobs['token'] != hash_token: return all_jobs # Break after 30 seconds to avoid gateway timeout if time.time() - start_time > 30: return {}, 204 time.sleep(1) @server.get('/api/jobs/') def filtered_jobs_json(status_val): state = string_to_status(status_val) jobs = [x.json() for x in RenderQueue.jobs_with_status(state)] if jobs: return jobs else: return f'Cannot find jobs with status {status_val}', 400 # -------------------------------------------- # Job Details / File Handling # -------------------------------------------- @server.get('/api/job/') def get_job_details(job_id): """Retrieves the details of a requested job in JSON format Args: job_id (str): The ID of the render job. Returns: dict: A JSON representation of the job's details. """ return RenderQueue.job_with_id(job_id).json() @server.get('/api/job//logs') def get_job_logs(job_id): """Retrieves the log file for a specific render job. Args: job_id (str): The ID of the render job. Returns: Response: The log file's content as plain text, or an empty response if the log file is not found. """ found_job = RenderQueue.job_with_id(job_id) log_path = Path(found_job.log_path()) log_data = None if log_path and log_path.exists(): with open(log_path) as file: log_data = file.read() return Response(log_data, mimetype='text/plain') @server.get('/api/job//file_list') def get_file_list(job_id): return [Path(p).name for p in RenderQueue.job_with_id(job_id).file_list()] @server.route('/api/job//download') def download_requested_file(job_id): requested_filename = request.args.get("filename") if not requested_filename: return "Filename required", 400 found_job = RenderQueue.job_with_id(job_id) for job_file in found_job.file_list(): p = Path(job_file) if p.name.lower() == requested_filename.lower(): return send_file(str(p), as_attachment=True) return f"File '{requested_filename}' not found", 404 @server.route('/api/job//download_all') def download_all_files(job_id): zip_filename = None @after_this_request def clear_zip(response): if zip_filename and zip_filename.exists(): try: zip_filename.unlink() except Exception as e: logger.warning(f"Error removing zip file '{zip_filename}': {e}") return response found_job = RenderQueue.job_with_id(job_id) output_dir = Path(found_job.output_path).parent if not output_dir.exists(): return f"Cannot find project files for job {job_id}", 500 zip_filename = Path(tempfile.gettempdir()) / f"{Path(found_job.input_path).stem}.zip" from zipfile import ZipFile with ZipFile(zip_filename, "w") as zipObj: for f in output_dir.iterdir(): if f.is_file(): zipObj.write(f, arcname=f.name) return send_file(str(zip_filename), mimetype="zip", as_attachment=True) # -------------------------------------------- # System Environment / Status # -------------------------------------------- @server.get('/api/presets') def presets() -> Dict[str, Any]: presets_path = Path('config/presets.yaml') with open(presets_path) as f: loaded_presets = yaml.load(f, Loader=yaml.FullLoader) return loaded_presets @server.get('/api/full_status') def full_status(): full_results = {'timestamp': datetime.now().isoformat(), 'servers': {}} try: snapshot_results = snapshot() server_data = {'status': snapshot_results.get('status', {}), 'jobs': snapshot_results.get('jobs', {}), 'is_online': True} full_results['servers'][server.config['HOSTNAME']] = server_data except Exception as e: logger.error(f"Exception fetching full status: {e}") return full_results @server.get('/api/snapshot') def snapshot(): server_status = status() server_jobs = [x.json() for x in RenderQueue.all_jobs()] server_data = {'status': server_status, 'jobs': server_jobs, 'timestamp': datetime.now().isoformat()} return server_data @server.route('/api/status') def status(): return {"timestamp": datetime.now().isoformat(), "system_os": current_system_os(), "system_os_version": current_system_os_version(), "system_cpu": current_system_cpu(), "system_cpu_brand": cpuinfo.get_cpu_info()['brand_raw'], "cpu_percent": psutil.cpu_percent(percpu=False), "cpu_percent_per_cpu": psutil.cpu_percent(percpu=True), "cpu_count": psutil.cpu_count(logical=False), "memory_total": psutil.virtual_memory().total, "memory_available": psutil.virtual_memory().available, "memory_percent": psutil.virtual_memory().percent, "job_counts": RenderQueue.job_counts(), "hostname": server.config['HOSTNAME'], "port": server.config['PORT'], "app_version": APP_VERSION, "api_version": API_VERSION } # -------------------------------------------- # Job Lifecyle (Create, Cancel, Delete) # -------------------------------------------- @server.post('/api/add_job') def add_job_handler(): """ POST /api/add_job Add a render job to the queue. **Request Formats** - JSON body: { "name": "example.blend", "engine": "blender", "frame_start": 1, "frame_end": 100, "render_settings": {...} "child_jobs"; [...] } **Responses** 200 Success 400 Invalid or missing input 500 Internal server error while parsing or creating jobs """ try: if request.is_json: new_job_data = request.get_json() elif request.form.get('json', None): new_job_data = json.loads(request.form['json']) else: return "Cannot find valid job data", 400 except Exception as e: err_msg = f"Error processing job data: {e}" logger.error(err_msg) return err_msg, 500 # Validate Job Data - check for required values and download or unzip project files try: processed_job_data = JobImportHandler.validate_job_data(new_job_data, server.config['UPLOAD_FOLDER'], uploaded_file=request.files.get('file')) except (KeyError, FileNotFoundError) as e: err_msg = f"Error processing job data: {e}" return err_msg, 400 except Exception as e: traceback.print_exception(e) err_msg = f"Unknown error processing data: {e}" return err_msg, 500 try: return JobImportHandler.create_jobs_from_processed_data(processed_job_data) except Exception as e: logger.exception(f"Error creating render job: {e}") return 'unknown error', 500 @server.post('/api/job//cancel') def cancel_job(job_id): if not request.args.get('confirm', False): return 'Confirmation required to cancel job', 400 if RenderQueue.cancel_job(RenderQueue.job_with_id(job_id)): if request.args.get('redirect', False): return redirect(url_for('index')) else: return "Job cancelled" else: return "Unknown error", 500 @server.post('/api/job//delete') def delete_job(job_id): try: if not request.args.get("confirm", False): return "Confirmation required to delete job", 400 found_job = RenderQueue.job_with_id(job_id) input_path = Path(found_job.input_path) output_path = Path(found_job.output_path) upload_root = Path(server.config["UPLOAD_FOLDER"]) project_dir = input_path.parent.parent output_dir = output_path.parent found_job.stop() try: PreviewManager.delete_previews_for_job(found_job) except Exception as e: logger.error(f"Error deleting previews for {found_job}: {e}") RenderQueue.delete_job(found_job) # Delete output directory if we own it if output_dir.exists() and output_dir.is_relative_to(upload_root): shutil.rmtree(output_dir) # Delete project directory if we own it and it's unused try: if project_dir.exists() and project_dir.is_relative_to(upload_root): project_dir_files = [p for p in project_dir.iterdir() if not p.name.startswith(".")] if not project_dir_files or (len(project_dir_files) == 1 and "source" in project_dir_files[0].name): logger.info(f"Removing project directory: {project_dir}") shutil.rmtree(project_dir) except Exception as e: logger.error(f"Error removing project files: {e}") return "Job deleted", 200 except Exception as e: logger.error(f"Error deleting job: {e}") return f"Error deleting job: {e}", 500 # -------------------------------------------- # Engine Info and Management: # -------------------------------------------- @server.get('/api/engines/for_filename') def get_engine_for_filename(): filename = request.args.get("filename") if not filename: return "Error: filename is required", 400 found_engine = EngineManager.engine_class_for_project_path(filename) if not found_engine: return f"Error: cannot find a suitable engine for '{filename}'", 400 return found_engine.name() def _validated_engine_response_type(): response_type = request.args.get('response_type', 'standard') if response_type not in ['full', 'standard']: raise ValueError(f"Invalid response_type: {response_type}") return response_type def _engine_info_for_engine(engine_class, response_type='standard'): try: installed_versions = EngineManager.all_version_data_for_engine(engine_class.name()) if not installed_versions: return None system_installed_versions = [v for v in installed_versions if v['type'] == 'system'] install_path = ( system_installed_versions[0]['path'] if system_installed_versions else installed_versions[0]['path'] ) engine = engine_class(install_path) engine_name = engine.name() result = { 'is_available': RenderQueue.is_available_for_job(engine_name), 'versions': installed_versions } if response_type == 'full': with concurrent.futures.ThreadPoolExecutor() as executor: future_results = { 'supported_extensions': executor.submit(engine.supported_extensions), 'supported_export_formats': executor.submit(engine.get_output_formats), 'system_info': executor.submit(engine.system_info), 'options': executor.submit(engine.ui_options) } for key, future in future_results.items(): result[key] = future.result() return result except Exception as e: logger.error(f"Error fetching details for engine '{engine_class.name()}': {e}") return {} @server.get('/api/engines') def get_engines_info(): response_type = _validated_engine_response_type() engine_data = {} with concurrent.futures.ThreadPoolExecutor() as executor: futures = { executor.submit(_engine_info_for_engine, engine, response_type): engine.name() for engine in EngineManager.supported_engines() } for future in concurrent.futures.as_completed(futures): result = future.result() if result: engine_data[futures[future]] = result return engine_data @server.get('/api/engines/names') def get_engine_names(): result = [] for engine_class in EngineManager.supported_engines(): data = EngineManager.all_version_data_for_engine(engine_class.name()) if data: result.append(engine_class.name()) return result @server.get('/api/engines/') def get_engine(engine_name): try: response_type = _validated_engine_response_type() engine_class = EngineManager.engine_class_with_name(engine_name) return _engine_info_for_engine(engine_class, response_type) or {} except Exception as e: logger.error(f"Error fetching details for engine '{engine_name}': {e}") return {} @server.get('/api/engines//availability') def get_engine_availability(engine_name): return {'engine': engine_name, 'available': RenderQueue.is_available_for_job(engine_name), 'cpu_count': int(psutil.cpu_count(logical=False)), 'versions': EngineManager.all_version_data_for_engine(engine_name), 'hostname': server.config.get('HOSTNAME', socket.gethostname())} @server.get('/api/engines//args') def get_engine_args(engine_name): try: engine_class = EngineManager.engine_class_with_name(engine_name) if not engine_class: return f"Cannot find engine '{engine_name}'", 400 return engine_class().get_arguments() except LookupError: return f"Cannot find engine '{engine_name}'", 400 @server.get('/api/engines//help') def get_engine_help(engine_name): try: engine_class = EngineManager.engine_class_with_name(engine_name) if not engine_class: return f"Cannot find engine '{engine_name}'", 400 return engine_class().get_help() except LookupError: return f"Cannot find engine '{engine_name}'", 400 # -------------------------------------------- # Engine Downloads and Updates: # -------------------------------------------- @server.get('/api/engines/download_available') def is_engine_available_to_download(): available_result = EngineManager.version_is_available_to_download(request.args.get('engine'), request.args.get('version'), request.args.get('system_os'), request.args.get('cpu')) return available_result if available_result else \ (f"Cannot find available download for {request.args.get('engine')} {request.args.get('version')}", 500) @server.get('/api/engines/most_recent_version') def find_most_recent_version(): most_recent = EngineManager.find_most_recent_version(request.args.get('engine'), request.args.get('system_os'), request.args.get('cpu')) return most_recent if most_recent else \ (f"Error finding most recent version of {request.args.get('engine')}", 500) @server.post('/api/engines/download') def download_engine(): download_result = EngineManager.download_engine(request.args.get('engine'), request.args.get('version'), request.args.get('system_os'), request.args.get('cpu')) return download_result if download_result else \ (f"Error downloading {request.args.get('engine')} {request.args.get('version')}", 500) @server.post('/api/engines/delete') def delete_engine_download(): json_data = request.json delete_result = EngineManager.delete_engine_download(json_data.get('engine'), json_data.get('version'), json_data.get('system_os'), json_data.get('cpu')) return "Success" if delete_result else \ (f"Error deleting {json_data.get('engine')} {json_data.get('version')}", 500) # -------------------------------------------- # Miscellaneous: # -------------------------------------------- @server.get('/api/heartbeat') def heartbeat(): return datetime.now().isoformat(), 200 @server.post('/api/job//send_subjob_update_notification') def subjob_update_notification(job_id): subjob_details = request.json DistributedJobManager.handle_subjob_update_notification(RenderQueue.job_with_id(job_id), subjob_data=subjob_details) return Response(status=200) @server.route('/api/job//thumbnail') def job_thumbnail(job_id): try: big_thumb = request.args.get('size', False) == "big" video_ok = request.args.get('video_ok', False) found_job = RenderQueue.job_with_id(job_id, none_ok=False) # trigger a thumbnail update - just in case PreviewManager.update_previews_for_job(found_job, wait_until_completion=True, timeout=60) previews = PreviewManager.get_previews_for_job(found_job) all_previews_list = previews.get('output', previews.get('input', [])) video_previews = [x for x in all_previews_list if x['kind'] == 'video'] image_previews = [x for x in all_previews_list if x['kind'] == 'image'] filtered_list = video_previews if video_previews and video_ok else image_previews # todo - sort by size or other metrics here if filtered_list: preview_to_send = filtered_list[0] mime_types = {'image': 'image/jpeg', 'video': 'video/mp4'} file_mime_type = mime_types.get(preview_to_send['kind'], 'unknown') return send_file(preview_to_send['filename'], mimetype=file_mime_type) except Exception as e: logger.error(f'Error getting thumbnail: {e}') return f'Error getting thumbnail: {e}', 500 return "No thumbnail available", 404 # -------------------------------------------- # System Benchmarks: # -------------------------------------------- @server.get('/api/cpu_benchmark') def get_cpu_benchmark_score(): from src.utilities.benchmark import cpu_benchmark return str(cpu_benchmark(10)) @server.get('/api/disk_benchmark') def get_disk_benchmark(): from src.utilities.benchmark import disk_io_benchmark results = disk_io_benchmark() return {'write_speed': results[0], 'read_speed': results[-1]} # -------------------------------------------- # Error Handlers: # -------------------------------------------- @server.errorhandler(JobNotFoundError) def handle_job_not_found(job_error): return str(job_error), 400 @server.errorhandler(DetachedInstanceError) def handle_detached_instance(_): return "Unavailable", 503 @server.errorhandler(404) def handle_404(error): url = request.url err_msg = f"404 Not Found: {url}" if 'favicon' not in url: logger.warning(err_msg) return err_msg, 404 @server.errorhandler(Exception) def handle_general_error(general_error): if isinstance(general_error, HTTPException): return general_error.description, general_error.code traceback.print_exception(type(general_error), general_error, general_error.__traceback__) err_msg = f"Server error: {general_error}" logger.error(err_msg) return err_msg, 500 # -------------------------------------------- # Debug / Development Only: # -------------------------------------------- @server.get('/api/_debug/detected_clients') def detected_clients(): # todo: dev/debug only. Should not ship this - probably. from src.utilities.zeroconf_server import ZeroconfServer return ZeroconfServer.found_hostnames() @server.post('/api/_debug/clear_history') def clear_history(): RenderQueue.clear_history() return 'success'