Reorganize api_server.py

This commit is contained in:
Brett Williams
2024-08-21 00:35:19 -05:00
parent 06e7bb15d8
commit 5534f0b1b2

View File

@@ -10,87 +10,74 @@ import ssl
import tempfile import tempfile
import time import time
from datetime import datetime from datetime import datetime
from zipfile import ZipFile
import psutil import psutil
import yaml import yaml
from flask import Flask, request, send_file, after_this_request, Response, redirect, url_for, abort from flask import Flask, request, send_file, after_this_request, Response, redirect, url_for
from sqlalchemy.orm.exc import DetachedInstanceError from sqlalchemy.orm.exc import DetachedInstanceError
from src.api.add_job_helpers import handle_uploaded_project_files, process_zipped_project from src.api.add_job_helpers import handle_uploaded_project_files, process_zipped_project
from src.api.preview_manager import PreviewManager from src.api.preview_manager import PreviewManager
from src.distributed_job_manager import DistributedJobManager from src.distributed_job_manager import DistributedJobManager
from src.engines.core.base_worker import string_to_status, RenderStatus
from src.engines.engine_manager import EngineManager from src.engines.engine_manager import EngineManager
from src.render_queue import RenderQueue, JobNotFoundError from src.render_queue import RenderQueue, JobNotFoundError
from src.utilities.benchmark import cpu_benchmark, disk_io_benchmark
from src.utilities.config import Config from src.utilities.config import Config
from src.utilities.misc_helper import system_safe_path, current_system_os, current_system_cpu, \ from src.utilities.misc_helper import system_safe_path, current_system_os, current_system_cpu, \
current_system_os_version, num_to_alphanumeric current_system_os_version, num_to_alphanumeric
from src.utilities.zeroconf_server import ZeroconfServer from src.utilities.status_utils import string_to_status
logger = logging.getLogger() logger = logging.getLogger()
server = Flask(__name__) server = Flask(__name__)
ssl._create_default_https_context = ssl._create_unverified_context # disable SSL for downloads ssl._create_default_https_context = ssl._create_unverified_context # disable SSL for downloads
categories = [RenderStatus.RUNNING, RenderStatus.ERROR, RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED,
RenderStatus.COMPLETED, RenderStatus.CANCELLED] def start_server(hostname=None):
# get hostname
if not hostname:
local_hostname = socket.gethostname()
hostname = local_hostname + (".local" if not local_hostname.endswith(".local") else "")
# load flask settings
server.config['HOSTNAME'] = hostname
server.config['PORT'] = int(Config.port_number)
server.config['UPLOAD_FOLDER'] = system_safe_path(os.path.expanduser(Config.upload_folder))
server.config['MAX_CONTENT_PATH'] = Config.max_content_path
server.config['enable_split_jobs'] = Config.enable_split_jobs
# disable most Flask logging
flask_log = logging.getLogger('werkzeug')
flask_log.setLevel(Config.flask_log_level.upper())
logger.debug('Starting API server')
server.run(host='0.0.0.0', port=server.config['PORT'], debug=Config.flask_debug_enable, use_reloader=False,
threaded=True)
# -- Error Handlers -- # --------------------------------------------
# Get All Jobs
@server.errorhandler(JobNotFoundError) # --------------------------------------------
def handle_job_not_found(job_error):
return str(job_error), 400
@server.errorhandler(DetachedInstanceError)
def handle_detached_instance(error):
# logger.debug(f"detached instance: {error}")
return "Unavailable", 503
@server.errorhandler(Exception)
def handle_general_error(general_error):
err_msg = f"Server error: {general_error}"
logger.error(err_msg)
return err_msg, 500
# -- Jobs --
def sorted_jobs(all_jobs, sort_by_date=True):
if not sort_by_date:
sorted_job_list = []
if all_jobs:
for status_category in categories:
found_jobs = [x for x in all_jobs if x.status == status_category.value]
if found_jobs:
sorted_found_jobs = sorted(found_jobs, key=lambda d: d.date_created, reverse=True)
sorted_job_list.extend(sorted_found_jobs)
else:
sorted_job_list = sorted(all_jobs, key=lambda d: d.date_created, reverse=True)
return sorted_job_list
@server.get('/api/jobs') @server.get('/api/jobs')
def jobs_json(): def jobs_json():
try: """Retrieves all jobs from the render queue in JSON format.
This endpoint fetches all jobs currently in the render queue, converts them to JSON format,
and returns them along with a cache token that represents the current state of the job list.
Returns:
dict: A dictionary containing:
- 'jobs' (list[dict]): A list of job dictionaries, each representing a job in the queue.
- 'token' (str): A cache token generated from the hash of the job list.
"""
all_jobs = [x.json() for x in RenderQueue.all_jobs()] all_jobs = [x.json() for x in RenderQueue.all_jobs()]
job_cache_int = int(json.dumps(all_jobs).__hash__()) job_cache_int = int(json.dumps(all_jobs).__hash__())
job_cache_token = num_to_alphanumeric(job_cache_int) job_cache_token = num_to_alphanumeric(job_cache_int)
return {'jobs': all_jobs, 'token': job_cache_token} return {'jobs': all_jobs, 'token': job_cache_token}
except DetachedInstanceError as e:
raise e
except Exception as e:
logger.error(f"Error fetching jobs_json: {e}")
raise e
@server.get('/api/jobs_long_poll') @server.get('/api/jobs_long_poll')
def long_polling_jobs(): def long_polling_jobs():
try:
hash_token = request.args.get('token', None) hash_token = request.args.get('token', None)
start_time = time.time() start_time = time.time()
while True: while True:
@@ -101,52 +88,6 @@ def long_polling_jobs():
if time.time() - start_time > 30: if time.time() - start_time > 30:
return {}, 204 return {}, 204
time.sleep(1) time.sleep(1)
except DetachedInstanceError as e:
raise e
except Exception as e:
logger.error(f"Error fetching long_polling_jobs: {e}")
raise e
@server.route('/api/job/<job_id>/thumbnail')
def job_thumbnail(job_id):
try:
big_thumb = request.args.get('size', False) == "big"
video_ok = request.args.get('video_ok', False)
found_job = RenderQueue.job_with_id(job_id, none_ok=False)
# trigger a thumbnail update - just in case
PreviewManager.update_previews_for_job(found_job, wait_until_completion=True, timeout=60)
previews = PreviewManager.get_previews_for_job(found_job)
all_previews_list = previews.get('output', previews.get('input', []))
video_previews = [x for x in all_previews_list if x['kind'] == 'video']
image_previews = [x for x in all_previews_list if x['kind'] == 'image']
filtered_list = video_previews if video_previews and video_ok else image_previews
# todo - sort by size or other metrics here
if filtered_list:
preview_to_send = filtered_list[0]
mime_types = {'image': 'image/jpeg', 'video': 'video/mp4'}
file_mime_type = mime_types.get(preview_to_send['kind'], 'unknown')
return send_file(preview_to_send['filename'], mimetype=file_mime_type)
except Exception as e:
logger.error(f'Error getting thumbnail: {e}')
return f'Error getting thumbnail: {e}', 500
return "No thumbnail available", 404
# Get job file routing
@server.route('/api/job/<job_id>/file/<filename>', methods=['GET'])
def get_job_file(job_id, filename):
found_job = RenderQueue.job_with_id(job_id)
try:
for full_path in found_job.file_list():
if filename in full_path:
return send_file(path_or_file=full_path)
except FileNotFoundError:
abort(404)
@server.get('/api/jobs/<status_val>') @server.get('/api/jobs/<status_val>')
@@ -159,20 +100,33 @@ def filtered_jobs_json(status_val):
return f'Cannot find jobs with status {status_val}', 400 return f'Cannot find jobs with status {status_val}', 400
@server.post('/api/job/<job_id>/send_subjob_update_notification') # --------------------------------------------
def subjob_update_notification(job_id): # Job Details / File Handling
subjob_details = request.json # --------------------------------------------
DistributedJobManager.handle_subjob_update_notification(RenderQueue.job_with_id(job_id), subjob_data=subjob_details)
return Response(status=200)
@server.get('/api/job/<job_id>') @server.get('/api/job/<job_id>')
def get_job_status(job_id): def get_job_details(job_id):
"""Retrieves the details of a requested job in JSON format
Args:
job_id (str): The ID of the render job.
Returns:
dict: A JSON representation of the job's details.
"""
return RenderQueue.job_with_id(job_id).json() return RenderQueue.job_with_id(job_id).json()
@server.get('/api/job/<job_id>/logs') @server.get('/api/job/<job_id>/logs')
def get_job_logs(job_id): def get_job_logs(job_id):
"""Retrieves the log file for a specific render job.
Args:
job_id (str): The ID of the render job.
Returns:
Response: The log file's content as plain text, or an empty response if the log file is not found.
"""
found_job = RenderQueue.job_with_id(job_id) found_job = RenderQueue.job_with_id(job_id)
log_path = system_safe_path(found_job.log_path()) log_path = system_safe_path(found_job.log_path())
log_data = None log_data = None
@@ -188,7 +142,7 @@ def get_file_list(job_id):
@server.route('/api/job/<job_id>/download') @server.route('/api/job/<job_id>/download')
def download_file(job_id): def download_requested_file(job_id):
requested_filename = request.args.get('filename') requested_filename = request.args.get('filename')
if not requested_filename: if not requested_filename:
@@ -203,7 +157,7 @@ def download_file(job_id):
@server.route('/api/job/<job_id>/download_all') @server.route('/api/job/<job_id>/download_all')
def download_all(job_id): def download_all_files(job_id):
zip_filename = None zip_filename = None
@after_this_request @after_this_request
@@ -218,6 +172,7 @@ def download_all(job_id):
found_job = RenderQueue.job_with_id(job_id) found_job = RenderQueue.job_with_id(job_id)
output_dir = os.path.dirname(found_job.output_path) output_dir = os.path.dirname(found_job.output_path)
if os.path.exists(output_dir): if os.path.exists(output_dir):
from zipfile import ZipFile
zip_filename = system_safe_path(os.path.join(tempfile.gettempdir(), zip_filename = system_safe_path(os.path.join(tempfile.gettempdir(),
pathlib.Path(found_job.input_path).stem + '.zip')) pathlib.Path(found_job.input_path).stem + '.zip'))
with ZipFile(zip_filename, 'w') as zipObj: with ZipFile(zip_filename, 'w') as zipObj:
@@ -229,6 +184,10 @@ def download_all(job_id):
return f'Cannot find project files for job {job_id}', 500 return f'Cannot find project files for job {job_id}', 500
# --------------------------------------------
# System Environment / Status
# --------------------------------------------
@server.get('/api/presets') @server.get('/api/presets')
def presets(): def presets():
presets_path = system_safe_path('config/presets.yaml') presets_path = system_safe_path('config/presets.yaml')
@@ -260,13 +219,28 @@ def snapshot():
return server_data return server_data
@server.get('/api/_detected_clients') @server.route('/api/status')
def detected_clients(): def status():
# todo: dev/debug only. Should not ship this - probably. return {"timestamp": datetime.now().isoformat(),
return ZeroconfServer.found_hostnames() "system_os": current_system_os(),
"system_os_version": current_system_os_version(),
"system_cpu": current_system_cpu(),
"cpu_percent": psutil.cpu_percent(percpu=False),
"cpu_percent_per_cpu": psutil.cpu_percent(percpu=True),
"cpu_count": psutil.cpu_count(logical=False),
"memory_total": psutil.virtual_memory().total,
"memory_available": psutil.virtual_memory().available,
"memory_percent": psutil.virtual_memory().percent,
"job_counts": RenderQueue.job_counts(),
"hostname": server.config['HOSTNAME'],
"port": server.config['PORT']
}
# New version # --------------------------------------------
# Job Lifecyle (Create, Cancel, Delete)
# --------------------------------------------
@server.post('/api/add_job') @server.post('/api/add_job')
def add_job_handler(): def add_job_handler():
# Process request data # Process request data
@@ -353,31 +327,9 @@ def delete_job(job_id):
return f"Error deleting job: {e}", 500 return f"Error deleting job: {e}", 500
@server.get('/api/clear_history') # --------------------------------------------
def clear_history(): # Engine Info and Management:
RenderQueue.clear_history() # --------------------------------------------
return 'success'
@server.route('/api/status')
def status():
# Get system info
return {"timestamp": datetime.now().isoformat(),
"system_os": current_system_os(),
"system_os_version": current_system_os_version(),
"system_cpu": current_system_cpu(),
"cpu_percent": psutil.cpu_percent(percpu=False),
"cpu_percent_per_cpu": psutil.cpu_percent(percpu=True),
"cpu_count": psutil.cpu_count(logical=False),
"memory_total": psutil.virtual_memory().total,
"memory_available": psutil.virtual_memory().available,
"memory_percent": psutil.virtual_memory().percent,
"job_counts": RenderQueue.job_counts(),
"hostname": server.config['HOSTNAME'],
"port": server.config['PORT']
}
@server.get('/api/renderer_info') @server.get('/api/renderer_info')
def renderer_info(): def renderer_info():
@@ -499,35 +451,95 @@ def get_renderer_help(renderer):
return f"Cannot find renderer '{renderer}'", 400 return f"Cannot find renderer '{renderer}'", 400
# --------------------------------------------
# Miscellaneous:
# --------------------------------------------
@server.post('/api/job/<job_id>/send_subjob_update_notification')
def subjob_update_notification(job_id):
subjob_details = request.json
DistributedJobManager.handle_subjob_update_notification(RenderQueue.job_with_id(job_id), subjob_data=subjob_details)
return Response(status=200)
@server.route('/api/job/<job_id>/thumbnail')
def job_thumbnail(job_id):
try:
big_thumb = request.args.get('size', False) == "big"
video_ok = request.args.get('video_ok', False)
found_job = RenderQueue.job_with_id(job_id, none_ok=False)
# trigger a thumbnail update - just in case
PreviewManager.update_previews_for_job(found_job, wait_until_completion=True, timeout=60)
previews = PreviewManager.get_previews_for_job(found_job)
all_previews_list = previews.get('output', previews.get('input', []))
video_previews = [x for x in all_previews_list if x['kind'] == 'video']
image_previews = [x for x in all_previews_list if x['kind'] == 'image']
filtered_list = video_previews if video_previews and video_ok else image_previews
# todo - sort by size or other metrics here
if filtered_list:
preview_to_send = filtered_list[0]
mime_types = {'image': 'image/jpeg', 'video': 'video/mp4'}
file_mime_type = mime_types.get(preview_to_send['kind'], 'unknown')
return send_file(preview_to_send['filename'], mimetype=file_mime_type)
except Exception as e:
logger.error(f'Error getting thumbnail: {e}')
return f'Error getting thumbnail: {e}', 500
return "No thumbnail available", 404
# --------------------------------------------
# System Benchmarks:
# --------------------------------------------
@server.get('/api/cpu_benchmark') @server.get('/api/cpu_benchmark')
def get_cpu_benchmark_score(): def get_cpu_benchmark_score():
from src.utilities.benchmark import cpu_benchmark
return str(cpu_benchmark(10)) return str(cpu_benchmark(10))
@server.get('/api/disk_benchmark') @server.get('/api/disk_benchmark')
def get_disk_benchmark(): def get_disk_benchmark():
from src.utilities.benchmark import disk_io_benchmark
results = disk_io_benchmark() results = disk_io_benchmark()
return {'write_speed': results[0], 'read_speed': results[-1]} return {'write_speed': results[0], 'read_speed': results[-1]}
def start_server(hostname=None): # --------------------------------------------
# Error Handlers:
# --------------------------------------------
# get hostname @server.errorhandler(JobNotFoundError)
if not hostname: def handle_job_not_found(job_error):
local_hostname = socket.gethostname() return str(job_error), 400
hostname = local_hostname + (".local" if not local_hostname.endswith(".local") else "")
# load flask settings
server.config['HOSTNAME'] = hostname
server.config['PORT'] = int(Config.port_number)
server.config['UPLOAD_FOLDER'] = system_safe_path(os.path.expanduser(Config.upload_folder))
server.config['MAX_CONTENT_PATH'] = Config.max_content_path
server.config['enable_split_jobs'] = Config.enable_split_jobs
# disable most Flask logging @server.errorhandler(DetachedInstanceError)
flask_log = logging.getLogger('werkzeug') def handle_detached_instance(_):
flask_log.setLevel(Config.flask_log_level.upper()) return "Unavailable", 503
logger.debug('Starting API server')
server.run(host='0.0.0.0', port=server.config['PORT'], debug=Config.flask_debug_enable, use_reloader=False, @server.errorhandler(Exception)
threaded=True) def handle_general_error(general_error):
err_msg = f"Server error: {general_error}"
logger.error(err_msg)
return err_msg, 500
# --------------------------------------------
# Debug / Development Only:
# --------------------------------------------
@server.get('/api/_debug/detected_clients')
def detected_clients():
# todo: dev/debug only. Should not ship this - probably.
from src.utilities.zeroconf_server import ZeroconfServer
return ZeroconfServer.found_hostnames()
@server.get('/api/_debug/clear_history')
def clear_history():
RenderQueue.clear_history()
return 'success'