#!/usr/bin/env python3 import json import logging import os import pathlib import platform import shutil import socket import ssl import threading import time import zipfile from datetime import datetime from urllib.request import urlretrieve from zipfile import ZipFile import json2html import psutil import yaml from flask import Flask, request, render_template, send_file, after_this_request, Response, redirect, url_for, abort from werkzeug.utils import secure_filename from lib.render_queue import RenderQueue, JobNotFoundError from lib.server.server_proxy import RenderServerProxy from lib.server.zeroconf_server import ZeroconfServer from lib.utilities.server_helper import generate_thumbnail_for_job from lib.workers.base_worker import string_to_status, RenderStatus from lib.workers.worker_factory import RenderWorkerFactory logger = logging.getLogger() server = Flask(__name__, template_folder='templates', static_folder='static') ssl._create_default_https_context = ssl._create_unverified_context # disable SSL for downloads categories = [RenderStatus.RUNNING, RenderStatus.ERROR, RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED, RenderStatus.COMPLETED, RenderStatus.CANCELLED] def sorted_jobs(all_jobs, sort_by_date=True): if not sort_by_date: sorted_job_list = [] if all_jobs: for status_category in categories: found_jobs = [x for x in all_jobs if x.status == status_category.value] if found_jobs: sorted_found_jobs = sorted(found_jobs, key=lambda d: d.date_created, reverse=True) sorted_job_list.extend(sorted_found_jobs) else: sorted_job_list = sorted(all_jobs, key=lambda d: d.date_created, reverse=True) return sorted_job_list @server.route('/') @server.route('/index') def index(): with open('config/presets.yaml') as f: presets = yaml.load(f, Loader=yaml.FullLoader) return render_template('index.html', all_jobs=sorted_jobs(RenderQueue.all_jobs()), hostname=server.config['HOSTNAME'], renderer_info=renderer_info(), render_clients=[server.config['HOSTNAME']], preset_list=presets) @server.get('/api/jobs') def jobs_json(): try: hash_token = request.args.get('token', None) all_jobs = [x.json() for x in RenderQueue.all_jobs()] job_cache_token = str(json.dumps(all_jobs).__hash__()) if hash_token and hash_token == job_cache_token: return [], 204 # no need to update else: return {'jobs': all_jobs, 'token': job_cache_token} except Exception as e: logger.exception(f"Exception fetching all_jobs_cached: {e}") return [], 500 @server.route('/ui/job//full_details') def job_detail(job_id): found_job = RenderQueue.job_with_id(job_id) table_html = json2html.json2html.convert(json=found_job.json(), table_attributes='class="table is-narrow is-striped is-fullwidth"') media_url = None if found_job.file_list() and found_job.status == RenderStatus.COMPLETED: media_basename = os.path.basename(found_job.file_list()[0]) media_url = f"/api/job/{job_id}/file/{media_basename}" return render_template('details.html', detail_table=table_html, media_url=media_url, hostname=server.config['HOSTNAME'], job_status=found_job.status.value.title(), job=found_job, renderer_info=renderer_info()) @server.route('/api/job//thumbnail') def job_thumbnail(job_id): big_thumb = request.args.get('size', False) == "big" video_ok = request.args.get('video_ok', False) found_job = RenderQueue.job_with_id(job_id, none_ok=True) if found_job: os.makedirs(server.config['THUMBS_FOLDER'], exist_ok=True) thumb_video_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.mp4') thumb_image_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.jpg') big_video_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '_big.mp4') big_image_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '_big.jpg') # generate regular thumb if it doesn't exist if not os.path.exists(thumb_video_path) and not os.path.exists(thumb_video_path + '_IN-PROGRESS') and \ found_job.status not in [RenderStatus.CANCELLED, RenderStatus.ERROR]: generate_thumbnail_for_job(found_job, thumb_video_path, thumb_image_path, max_width=240) # generate big thumb if it doesn't exist if not os.path.exists(big_video_path) and not os.path.exists(big_image_path + '_IN-PROGRESS') and \ found_job.status not in [RenderStatus.CANCELLED, RenderStatus.ERROR]: generate_thumbnail_for_job(found_job, big_video_path, big_image_path, max_width=800) # generated videos if video_ok: if big_thumb and os.path.exists(big_video_path) and not os.path.exists( big_video_path + '_IN-PROGRESS'): return send_file(big_video_path, mimetype="video/mp4") elif os.path.exists(thumb_video_path) and not os.path.exists(thumb_video_path + '_IN-PROGRESS'): return send_file(thumb_video_path, mimetype="video/mp4") # Generated thumbs if big_thumb and os.path.exists(big_image_path): return send_file(big_image_path, mimetype='image/jpeg') elif os.path.exists(thumb_image_path): return send_file(thumb_image_path, mimetype='image/jpeg') # Misc status icons if found_job.status == RenderStatus.RUNNING: return send_file('static/images/gears.png', mimetype="image/png") elif found_job.status == RenderStatus.CANCELLED: return send_file('static/images/cancelled.png', mimetype="image/png") elif found_job.status == RenderStatus.SCHEDULED: return send_file('static/images/scheduled.png', mimetype="image/png") elif found_job.status == RenderStatus.NOT_STARTED: return send_file('static/images/not_started.png', mimetype="image/png") # errors return send_file('static/images/error.png', mimetype="image/png") # Get job file routing @server.route('/api/job//file/', methods=['GET']) def get_job_file(job_id, filename): found_job = RenderQueue.job_with_id(job_id) try: for full_path in found_job.file_list(): if filename in full_path: return send_file(path_or_file=full_path) except FileNotFoundError: abort(404) @server.get('/api/jobs/') def filtered_jobs_json(status_val): state = string_to_status(status_val) jobs = [x.json() for x in RenderQueue.jobs_with_status(state)] if jobs: return jobs else: return f'Cannot find jobs with status {status_val}', 400 @server.errorhandler(JobNotFoundError) def handle_job_not_found(job_error): return f'Cannot find job with ID {job_error.job_id}', 400 @server.get('/api/job/') def get_job_status(job_id): return RenderQueue.job_with_id(job_id).json() @server.get('/api/job//logs') def get_job_logs(job_id): found_job = RenderQueue.job_with_id(job_id) log_path = found_job.log_path() log_data = None if log_path and os.path.exists(log_path): with open(log_path) as file: log_data = file.read() return Response(log_data, mimetype='text/plain') @server.get('/api/job//file_list') def get_file_list(job_id): return RenderQueue.job_with_id(job_id).file_list() @server.get('/api/job//make_ready') def make_job_ready(job_id): try: found_job = RenderQueue.job_with_id(job_id) if found_job.status in [RenderStatus.NOT_READY, RenderStatus.NOT_STARTED]: if found_job.children: for hostname, child_id in found_job.children.items(): RenderServerProxy(hostname).request_data(f'/api/job//make_ready') found_job.status = RenderStatus.NOT_STARTED RenderQueue.save_state() return found_job.json(), 200 except Exception as e: return "Error making job ready: {e}", 500 return "Not valid command", 405 @server.route('/api/job//download_all') def download_all(job_id): zip_filename = None @after_this_request def clear_zip(response): if zip_filename and os.path.exists(zip_filename): os.remove(zip_filename) return response found_job = RenderQueue.job_with_id(job_id) output_dir = os.path.dirname(found_job.output_path) if os.path.exists(output_dir): zip_filename = os.path.join('/tmp', pathlib.Path(found_job.input_path).stem + '.zip') with ZipFile(zip_filename, 'w') as zipObj: for f in os.listdir(output_dir): zipObj.write(filename=os.path.join(output_dir, f), arcname=os.path.basename(f)) return send_file(zip_filename, mimetype="zip", as_attachment=True, ) else: return f'Cannot find project files for job {job_id}', 500 @server.get('/api/presets') def presets(): with open('config/presets.yaml') as f: presets = yaml.load(f, Loader=yaml.FullLoader) return presets @server.get('/api/full_status') def full_status(): full_results = {'timestamp': datetime.now().isoformat(), 'servers': {}} try: snapshot_results = snapshot() server_data = {'status': snapshot_results.get('status', {}), 'jobs': snapshot_results.get('jobs', {}), 'is_online': True} full_results['servers'][server.config['HOSTNAME']] = server_data except Exception as e: logger.error(f"Exception fetching full status: {e}") return full_results @server.get('/api/snapshot') def snapshot(): server_status = status() server_jobs = [x.json() for x in RenderQueue.all_jobs()] server_data = {'status': server_status, 'jobs': server_jobs, 'timestamp': datetime.now().isoformat()} return server_data @server.get('/api/_detected_clients') def detected_clients(): # todo: dev/debug only. Should not ship this - probably. return server.config['ZEROCONF_SERVER'].found_clients() @server.route('/api/is_available_for_job', methods=['POST', 'GET']) def available_for_job(): """ Check queue to see if it can take a job with a given renderer and priority """ renderer = request.args.get('renderer') priority = request.args.get('priority') if not renderer or not priority: return {"error": "Both 'renderer' and 'priority' parameters are required"}, 400 elif renderer not in RenderWorkerFactory.supported_renderers(): return {"error": f"Unsupported renderer: {renderer}"}, 400 else: return {"is_available": RenderQueue.is_available_for_job(renderer, priority), 'renderer': renderer, 'priority': priority}, 200 @server.post('/api/add_job') def add_job_handler(): # initial handling of raw data try: if request.is_json: jobs_list = [request.json] if not isinstance(request.json, list) else request.json elif request.form.get('json', None): jobs_list = json.loads(request.form['json']) else: # Cleanup flat form data into nested structure form_dict = {k: v for k, v in dict(request.form).items() if v} args = {} arg_keys = [k for k in form_dict.keys() if '-arg_' in k] for server_hostname in arg_keys: if form_dict['renderer'] in server_hostname or 'AnyRenderer' in server_hostname: cleaned_key = server_hostname.split('-arg_')[-1] args[cleaned_key] = form_dict[server_hostname] form_dict.pop(server_hostname) args['raw'] = form_dict.get('raw_args', None) form_dict['args'] = args jobs_list = [form_dict] except Exception as e: err_msg = f"Error processing job data: {e}" logger.error(err_msg) return err_msg, 500 # start handling project files try: # handle uploaded files logger.debug(f"Incoming new job request: {jobs_list}") uploaded_project = request.files.get('file', None) project_url = jobs_list[0].get('url', None) local_path = jobs_list[0].get('local_path', None) renderer = jobs_list[0].get('renderer') downloaded_file_url = None if uploaded_project and uploaded_project.filename: referred_name = os.path.basename(uploaded_project.filename) elif project_url: # download and save url - have to download first to know filename due to redirects logger.info(f"Attempting to download URL: {project_url}") try: downloaded_file_url, info = urlretrieve(project_url) referred_name = info.get_filename() or os.path.basename(project_url) except Exception as e: err_msg = f"Error downloading file: {e}" logger.error(err_msg) return err_msg, 406 elif local_path and os.path.exists(local_path): referred_name = os.path.basename(local_path) else: return "Cannot find any valid project paths", 400 # prep local filepath job_dir = os.path.join(server.config['UPLOAD_FOLDER'], '_'.join( [datetime.now().strftime("%Y.%m.%d_%H.%M.%S"), renderer, os.path.splitext(referred_name)[0]])) os.makedirs(job_dir, exist_ok=True) upload_dir = os.path.join(job_dir, 'source') os.makedirs(upload_dir, exist_ok=True) # move projects to their work directories loaded_project_local_path = None if uploaded_project and uploaded_project.filename: loaded_project_local_path = os.path.join(upload_dir, secure_filename(uploaded_project.filename)) uploaded_project.save(loaded_project_local_path) logger.info(f"Transfer complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}") elif project_url: loaded_project_local_path = os.path.join(upload_dir, referred_name) shutil.move(downloaded_file_url, loaded_project_local_path) logger.info(f"Download complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}") elif local_path: loaded_project_local_path = os.path.join(upload_dir, referred_name) shutil.copy(local_path, loaded_project_local_path) logger.info(f"Import complete for {loaded_project_local_path.split(server.config['UPLOAD_FOLDER'])[-1]}") # process uploaded zip files zip_path = loaded_project_local_path if loaded_project_local_path.lower().endswith('.zip') else None if zip_path: zip_path = loaded_project_local_path work_path = os.path.dirname(zip_path) try: with zipfile.ZipFile(zip_path, 'r') as myzip: myzip.extractall(os.path.dirname(zip_path)) project_files = [x for x in os.listdir(work_path) if os.path.isfile(os.path.join(work_path, x))] project_files = [x for x in project_files if '.zip' not in x] supported_exts = RenderWorkerFactory.class_for_name(renderer).engine.supported_extensions if supported_exts: project_files = [file for file in project_files if any(file.endswith(ext) for ext in supported_exts)] if len(project_files) != 1: # we have to narrow down to 1 main project file, otherwise error return {'error': f'Cannot find valid project file in {os.path.basename(zip_path)}'}, 400 extracted_project_path = os.path.join(work_path, project_files[0]) logger.info(f"Extracted zip file to {extracted_project_path}") loaded_project_local_path = extracted_project_path except (zipfile.BadZipFile, zipfile.LargeZipFile) as e: err_msg = f"Error processing zip file: {e}" logger.error(err_msg) return err_msg, 500 # create and add jobs to render queue results = [] for job_data in jobs_list: try: # prepare output paths output_dir = os.path.join(job_dir, job_data.get('name') if len(jobs_list) > 1 else 'output') os.makedirs(output_dir, exist_ok=True) # get new output path in output_dir job_data['output_path'] = os.path.join(output_dir, os.path.basename( job_data.get('output_path', None) or loaded_project_local_path )) # create & configure jobs worker = RenderWorkerFactory.create_worker(renderer=job_data['renderer'], input_path=loaded_project_local_path, output_path=job_data["output_path"], args=job_data.get('args', {})) worker.status = job_data.get("initial_status", worker.status) worker.parent = job_data.get("parent", worker.parent) worker.name = job_data.get("name", worker.name) worker.priority = int(job_data.get('priority', worker.priority)) worker.start_frame = int(job_data.get("start_frame", worker.start_frame)) worker.end_frame = int(job_data.get("end_frame", worker.end_frame)) # determine if we can / should split the job if server.config.get('enable_split_jobs', False) and (worker.total_frames > 1) and not worker.parent: create_subjobs(worker, job_data, zip_path or loaded_project_local_path) RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False)) make_job_ready(worker.id) results.append(worker.json()) except Exception as e: err_msg = f"Error creating render job: {e}" logger.error(err_msg) results.append({'error': err_msg}) # return any errors from results list for response in results: if response.get('error', None): return results, 400 # redirect to index if requested if request.args.get('redirect', False): return redirect(url_for('index')) else: return results, 200 except Exception as e: logger.exception(f"Unknown error adding job: {e}") return 'unknown error', 500 def create_subjobs(worker, job_data, project_path): # Check availablity local_hostname = server.config['HOSTNAME'] found_servers = [x for x in server.config['ZEROCONF_SERVER'].found_clients() if local_hostname not in x] available_servers = [local_hostname] + [hostname for hostname in found_servers if RenderServerProxy(hostname).is_available_for_job(renderer=worker.renderer, priority=worker.priority)] if len(available_servers) <= 1: logger.debug("No available servers to split job with. Skipping subjob creation.") return logger.info(f"Found {len(available_servers) - 1} additional available servers | " f"Breaking up job into {len(available_servers)} jobs") logger.debug(f"Available servers: {available_servers}") def divide_frames(start_frame, end_frame, num_servers): frame_range = end_frame - start_frame + 1 frames_per_server = frame_range // num_servers leftover_frames = frame_range % num_servers ranges = [] current_start = start_frame for i in range(num_servers): current_end = current_start + frames_per_server - 1 if leftover_frames > 0: current_end += 1 leftover_frames -= 1 if current_start <= current_end: ranges.append((current_start, current_end)) current_start = current_end + 1 return ranges # Calculate respective frames for each server server_frame_ranges = {} for idx, frame_range in enumerate(divide_frames(worker.start_frame, worker.end_frame, len(available_servers))): server_frame_ranges[available_servers[idx]] = frame_range logger.info(f"Job {worker.id} split plan: {server_frame_ranges}") # Prep and submit these sub-jobs submission_results = {} try: for server_hostname, frame_range in server_frame_ranges.items(): if server_hostname != local_hostname: subjob = job_data.copy() subjob['name'] = f"{worker.name}[{frame_range[0]}-{frame_range[-1]}]" subjob['parent'] = f"{worker.id}@{local_hostname}" subjob['start_frame'] = frame_range[0] subjob['end_frame'] = frame_range[-1] logger.debug(f"Posting subjob with frames {subjob['start_frame']}-" f"{subjob['end_frame']} to {server_hostname}") post_results = RenderServerProxy(server_hostname).post_job_to_server( file_path=project_path, job_list=[subjob]) if post_results.ok: submission_results[server_hostname] = post_results.json()[0] else: logger.error(f"Failed to create subjob on {server_hostname}") break # check that job posts were all successful. if len(submission_results) != (len(server_frame_ranges) - 1): raise ValueError("Failed to create all subjobs") # look into recalculating job numbers and use exising jobs # truncate parent render_job worker.end_frame = min(server_frame_ranges[local_hostname][-1], worker.end_frame) logger.info(f"Local job now rendering from {worker.start_frame} to {worker.end_frame}") # start subjobs logger.debug(f"Starting {len(server_frame_ranges) - 1} attempted subjobs") for hostname, results in submission_results.items(): worker.children[hostname] = results['id'] worker.name = f"{worker.name}[{worker.start_frame}-{worker.end_frame}]" except Exception as e: # cancel all the subjobs logger.error(f"Failed to split job into subjobs: {e}") logger.debug(f"Cancelling {len(server_frame_ranges) - 1} attempted subjobs") [RenderServerProxy(hostname).cancel_job(results['id'], confirm=True) for hostname, results in submission_results.items()] @server.get('/api/job//cancel') def cancel_job(job_id): if not request.args.get('confirm', False): return 'Confirmation required to cancel job', 400 if RenderQueue.cancel_job(RenderQueue.job_with_id(job_id)): if request.args.get('redirect', False): return redirect(url_for('index')) else: return "Job cancelled" else: return "Unknown error", 500 @server.route('/api/job//delete', methods=['POST', 'GET']) def delete_job(job_id): try: if not request.args.get('confirm', False): return 'Confirmation required to delete job', 400 # Check if we can remove the 'output' directory found_job = RenderQueue.job_with_id(job_id) output_dir = os.path.dirname(found_job.output_path) if server.config['UPLOAD_FOLDER'] in output_dir and os.path.exists(output_dir): shutil.rmtree(output_dir) # Remove any thumbnails for filename in os.listdir(server.config['THUMBS_FOLDER']): if job_id in filename: os.remove(os.path.join(server.config['THUMBS_FOLDER'], filename)) thumb_path = os.path.join(server.config['THUMBS_FOLDER'], found_job.id + '.mp4') if os.path.exists(thumb_path): os.remove(thumb_path) # See if we own the project_dir (i.e. was it uploaded) project_dir = os.path.dirname(os.path.dirname(found_job.input_path)) if server.config['UPLOAD_FOLDER'] in project_dir and os.path.exists(project_dir): # check to see if any other projects are sharing the same project file project_dir_files = [f for f in os.listdir(project_dir) if not f.startswith('.')] if len(project_dir_files) == 0 or (len(project_dir_files) == 1 and 'source' in project_dir_files[0]): logger.info(f"Removing project directory: {project_dir}") shutil.rmtree(project_dir) RenderQueue.delete_job(found_job) if request.args.get('redirect', False): return redirect(url_for('index')) else: return "Job deleted", 200 except Exception as e: logger.error(f"Error deleting job: {e}") return f"Error deleting job: {e}", 500 @server.get('/api/clear_history') def clear_history(): RenderQueue.clear_history() return 'success' @server.route('/api/status') def status(): return {"timestamp": datetime.now().isoformat(), "platform": platform.platform(), "cpu_percent": psutil.cpu_percent(percpu=False), "cpu_percent_per_cpu": psutil.cpu_percent(percpu=True), "cpu_count": psutil.cpu_count(), "memory_total": psutil.virtual_memory().total, "memory_available": psutil.virtual_memory().available, "memory_percent": psutil.virtual_memory().percent, "job_counts": RenderQueue.job_counts(), "hostname": server.config['HOSTNAME'], "port": server.config['PORT'] } @server.get('/api/renderer_info') def renderer_info(): renderer_data = {} for r in RenderWorkerFactory.supported_renderers(): engine = RenderWorkerFactory.class_for_name(r).engine engine_available = engine.renderer_path() is not None renderer_data[r] = {'available': engine_available, 'version': engine.version() if engine_available else None, 'supported_extensions': engine.supported_extensions, 'supported_export_formats': engine.get_output_formats() if engine_available else None, 'path': engine.renderer_path()} return renderer_data @server.route('/upload') def upload_file_page(): return render_template('upload.html', supported_renderers=RenderWorkerFactory.supported_renderers()) def start_server(background_thread=False): def eval_loop(delay_sec=1): while True: RenderQueue.evaluate_queue() time.sleep(delay_sec) with open('config/config.yaml') as f: config = yaml.load(f, Loader=yaml.FullLoader) logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S', level=config.get('server_log_level', 'INFO').upper()) # get hostname local_hostname = socket.gethostname() local_hostname = local_hostname + (".local" if not local_hostname.endswith(".local") else "") # load flask settings server.config['HOSTNAME'] = local_hostname server.config['PORT'] = int(config.get('port_number', 8080)) server.config['UPLOAD_FOLDER'] = os.path.expanduser(config['upload_folder']) server.config['THUMBS_FOLDER'] = os.path.join(os.path.expanduser(config['upload_folder']), 'thumbs') server.config['MAX_CONTENT_PATH'] = config['max_content_path'] server.config['enable_split_jobs'] = config.get('enable_split_jobs', False) # disable most Flask logging flask_log = logging.getLogger('werkzeug') flask_log.setLevel(config.get('flask_log_level', 'ERROR').upper()) # Set up the RenderQueue object RenderQueue.load_state() thread = threading.Thread(target=eval_loop, kwargs={'delay_sec': config.get('queue_eval_seconds', 1)}, daemon=True) thread.start() logging.info(f"Starting Zordon Render Server - Hostname: '{server.config['HOSTNAME']}:'") zeroconf_server = ZeroconfServer("_zordon._tcp.local.", server.config['HOSTNAME'], server.config['PORT']) zeroconf_server.start() server.config['ZEROCONF_SERVER'] = zeroconf_server try: if background_thread: server_thread = threading.Thread( target=lambda: server.run(host='0.0.0.0', port=server.config['PORT'], debug=False, use_reloader=False)) server_thread.start() server_thread.join() else: server.run(host='0.0.0.0', port=server.config['PORT'], debug=config.get('flask_debug_enable', False), use_reloader=False, threaded=True) finally: RenderQueue.save_state() zeroconf_server.stop()