diff --git a/config.yaml b/config.yaml index 691a2bf..95067b1 100644 --- a/config.yaml +++ b/config.yaml @@ -1,4 +1,5 @@ upload_folder: "/Users/brett/Desktop/zordon-uploads/" max_content_path: 100000000 flask_log_level: error +flask_debug_enable: false queue_eval_seconds: 1 \ No newline at end of file diff --git a/zordon_server.py b/zordon_server.py index 1094cef..7be8193 100755 --- a/zordon_server.py +++ b/zordon_server.py @@ -1,21 +1,23 @@ #!/usr/bin/env python +import argparse +import json import logging +import os import platform import socket import threading import time import uuid from datetime import datetime + import psutil import requests import yaml -import json -import os -from flask import Flask, jsonify, request +from flask import Flask, jsonify, request, render_template +from werkzeug.utils import secure_filename from utilities.render_worker import RenderWorkerFactory, RenderStatus, string_to_status -data = 'foo' app = Flask(__name__) logger = logging.getLogger() @@ -90,8 +92,10 @@ class RenderServer: render_queue = [] render_clients = [] maximum_renderer_instances = {'Blender': 2, 'After Effects': 1, 'ffmpeg': 4} - host_name = socket.gethostname() + host_name = None port = 8080 + client_mode = False + server_hostname = None last_saved_counts = {} @@ -99,10 +103,11 @@ class RenderServer: pass @classmethod - def add_to_render_queue(cls, render_job, force_start=False): + def add_to_render_queue(cls, render_job, force_start=False, client=None): - if not render_job.client: + if not client or render_job.client == cls.host_name: logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job.render)) + render_job.client = cls.host_name cls.render_queue.append(render_job) if force_start: cls.start_job(render_job) @@ -271,15 +276,73 @@ class RenderServer: @classmethod def register_client(cls, hostname): - # todo: register this - response = requests.get("https://{}/_register_".format(hostname)) - print(response.status_code) - pass + + success = False + + if hostname in cls.render_clients: + logger.warning(f"Client '{hostname}' already registered") + return success + + try: + response = requests.get(f"https://{hostname}/status", timeout=1) + if response.ok: + cls.render_clients.append(hostname) + logger.info(f"Client '{hostname}' successfully registered") + success = True + except requests.exceptions.ConnectTimeout as e: + logger.error(f"Cannot connect to client at hostname: {hostname}") + return success @classmethod def unregister_client(cls): pass + @classmethod + def start(cls, background_thread=False): + + def eval_loop(delay_sec=1): + while True: + cls.evaluate_queue() + time.sleep(delay_sec) + + logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S', + level=logging.INFO) + + with open('config.yaml') as f: + config = yaml.load(f, Loader=yaml.FullLoader) + + app.config['UPLOAD_FOLDER'] = config['upload_folder'] + app.config['MAX_CONTENT_PATH'] = config['max_content_path'] + + # Get hostname and render clients + cls.host_name = socket.gethostname() + if not RenderServer.render_clients: + cls.render_clients = [RenderServer.host_name] + + if not cls.client_mode: + logger.info(f"Starting Zordon in Server Mode - {cls.host_name}") + else: + logger.info(f"Starting Zordon in Client Mode - {cls.host_name}") + + # disable most Flask logging + flask_log = logging.getLogger('werkzeug') + flask_log.setLevel(config.get('flask_log_level', 'ERROR').upper()) + + # Setup the RenderServer object + cls.load_history() + + thread = threading.Thread(target=eval_loop, kwargs={'delay_sec': config.get('queue_eval_seconds', 1)}, daemon=True) + thread.start() + + if background_thread: + server_thread = threading.Thread( + target=lambda: app.run(host=cls.host_name, port=cls.port, debug=False, use_reloader=False)) + server_thread.start() + server_thread.join() + else: + app.run(host=cls.host_name, port=cls.port, debug=config.get('flask_debug_enable', False), + use_reloader=False) + @app.get('/jobs') def jobs_json(): @@ -290,33 +353,51 @@ def jobs_json(): def filtered_jobs_json(status_val): state = string_to_status(status_val) jobs = [json.loads(x.json()) for x in RenderServer.jobs_with_status(state)] - return jsonify(jobs) + return jobs + + +@app.get('/job_status/') +def get_job_status(job_id): + found_job = RenderServer.job_with_id(job_id) + if found_job: + return found_job.json() + else: + return None + + +@app.post('/register_client') +def register_client(): + client_hostname = request.values['hostname'] + x = RenderServer.register_client(client_hostname) + return "Success" if x else "Fail" @app.get('/full_status') def full_status(): full_results = {'timestamp': datetime.now().isoformat(), 'servers': {}} - # todo: iterate through all servers - - server_list = [socket.gethostname()] - try: - for server_hostname in server_list: - if server_hostname == local_hostname: - server_status = RenderServer.status() - server_jobs = [json.loads(x.json()) for x in RenderServer.render_queue if not x.archived] + for client_hostname in RenderServer.render_clients: + if client_hostname == local_hostname: + snapshot_results = snapshot() else: - server_status = requests.get(f'http://{server_hostname}:8080/status', timeout=1).json() - server_jobs = requests.get(f'http://{server_hostname}:8080/jobs', timeout=1).json() - server_data = {'status': server_status, 'jobs': server_jobs} - full_results['servers'][server_hostname] = server_data + snapshot_results = requests.get(f'http://{client_hostname}:8080/snapshot', timeout=1).json() + server_data = {'status': snapshot_results.get('status'), 'jobs': snapshot_results.get('jobs')} + full_results['servers'][client_hostname] = server_data except Exception as e: logger.error(f"Exception fetching full status: {e}") return full_results +@app.get('/snapshot') +def snapshot(): + server_status = RenderServer.status() + server_jobs = [json.loads(x.json()) for x in RenderServer.render_queue if not x.archived] + server_data = {'status': server_status, 'jobs': server_jobs, 'timestamp': datetime.now().isoformat()} + return server_data + + @app.post('/add_job') def add_job(): """Create new job and add to server render queue""" @@ -327,16 +408,6 @@ def add_job(): args = request.json.get('args', None) force_start = request.json.get('force_start', False) - return add_job_handler(renderer, input_path, output_path, args=args, priority=priority, force_start=force_start) - - -def add_job_handler(renderer, input_path, output_path, args=None, priority=2, force_start=False): - - if not os.path.exists(input_path): - err_msg = f"Cannot add job. Cannot find input file: {input_path}" - logger.error(err_msg) - return {"error": err_msg}, 400 - try: render_job = RenderWorkerFactory.create_worker(renderer, input_path, output_path, args) except ValueError as e: @@ -399,25 +470,38 @@ def upload_file(): if not uploaded_file.filename: return {'error': 'no file uploaded'} - with open('config.yaml') as f: - config = yaml.load(f, Loader=yaml.FullLoader) + # generate directory to use + logger.debug(f"Receiving uploaded file {uploaded_file.filename}") + new_id = RenderJob.generate_id() + job_dir = os.path.join(app.config['UPLOAD_FOLDER'], new_id + "-" + uploaded_file.filename) + if not os.path.exists(job_dir): + os.makedirs(job_dir) - # disable most Flask logging - flask_log = logging.getLogger('werkzeug') - flask_log.setLevel(logging.ERROR) + local_path = os.path.join(job_dir, secure_filename(uploaded_file.filename)) + uploaded_file.save(local_path) + renderer = request.values['renderer'] - logging.basicConfig(format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S', level=logging.INFO) + # todo: finish output_path and args - currently placeholder data + output_path = os.path.join(job_dir, uploaded_file.filename + "-output.mp4") - # Setup the RenderServer object - RenderServer.load_history() - RenderServer.evaluate_queue() + if renderer == 'blender': + args = {'engine': request.values['blender_engine']} + else: + args = None - thread = threading.Thread(target=eval_loop, daemon=True) - thread.start() - server_thread = threading.Thread(target=lambda: app.run(host=RenderServer.host_name, port=RenderServer.port, debug=False, use_reloader=False)) - server_thread.start() - if not background_thread: - server_thread.join() + try: + render_job = RenderWorkerFactory.create_worker(renderer, local_path, output_path, args=args) + new_job = RenderJob(render_job, custom_id=new_id) + RenderServer.add_to_render_queue(new_job) + return new_job.json() + except ValueError as e: + logger.exception(e) + return {'error': str(e)}, 400 + + except Exception as e: + logger.exception(e) + + return {'error': 'unknown error'} if __name__ == '__main__':