Add initial support for registering clients

This commit is contained in:
Brett Williams
2022-10-09 14:17:37 -07:00
parent 0f63aa629e
commit 9d6f7f4187
2 changed files with 134 additions and 49 deletions

View File

@@ -1,4 +1,5 @@
upload_folder: "/Users/brett/Desktop/zordon-uploads/" upload_folder: "/Users/brett/Desktop/zordon-uploads/"
max_content_path: 100000000 max_content_path: 100000000
flask_log_level: error flask_log_level: error
flask_debug_enable: false
queue_eval_seconds: 1 queue_eval_seconds: 1

View File

@@ -1,21 +1,23 @@
#!/usr/bin/env python #!/usr/bin/env python
import argparse
import json
import logging import logging
import os
import platform import platform
import socket import socket
import threading import threading
import time import time
import uuid import uuid
from datetime import datetime from datetime import datetime
import psutil import psutil
import requests import requests
import yaml import yaml
import json from flask import Flask, jsonify, request, render_template
import os from werkzeug.utils import secure_filename
from flask import Flask, jsonify, request
from utilities.render_worker import RenderWorkerFactory, RenderStatus, string_to_status from utilities.render_worker import RenderWorkerFactory, RenderStatus, string_to_status
data = 'foo'
app = Flask(__name__) app = Flask(__name__)
logger = logging.getLogger() logger = logging.getLogger()
@@ -90,8 +92,10 @@ class RenderServer:
render_queue = [] render_queue = []
render_clients = [] render_clients = []
maximum_renderer_instances = {'Blender': 2, 'After Effects': 1, 'ffmpeg': 4} maximum_renderer_instances = {'Blender': 2, 'After Effects': 1, 'ffmpeg': 4}
host_name = socket.gethostname() host_name = None
port = 8080 port = 8080
client_mode = False
server_hostname = None
last_saved_counts = {} last_saved_counts = {}
@@ -99,10 +103,11 @@ class RenderServer:
pass pass
@classmethod @classmethod
def add_to_render_queue(cls, render_job, force_start=False): def add_to_render_queue(cls, render_job, force_start=False, client=None):
if not render_job.client: if not client or render_job.client == cls.host_name:
logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job.render)) logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job.render))
render_job.client = cls.host_name
cls.render_queue.append(render_job) cls.render_queue.append(render_job)
if force_start: if force_start:
cls.start_job(render_job) cls.start_job(render_job)
@@ -271,15 +276,73 @@ class RenderServer:
@classmethod @classmethod
def register_client(cls, hostname): def register_client(cls, hostname):
# todo: register this
response = requests.get("https://{}/_register_".format(hostname)) success = False
print(response.status_code)
pass if hostname in cls.render_clients:
logger.warning(f"Client '{hostname}' already registered")
return success
try:
response = requests.get(f"https://{hostname}/status", timeout=1)
if response.ok:
cls.render_clients.append(hostname)
logger.info(f"Client '{hostname}' successfully registered")
success = True
except requests.exceptions.ConnectTimeout as e:
logger.error(f"Cannot connect to client at hostname: {hostname}")
return success
@classmethod @classmethod
def unregister_client(cls): def unregister_client(cls):
pass pass
@classmethod
def start(cls, background_thread=False):
def eval_loop(delay_sec=1):
while True:
cls.evaluate_queue()
time.sleep(delay_sec)
logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S',
level=logging.INFO)
with open('config.yaml') as f:
config = yaml.load(f, Loader=yaml.FullLoader)
app.config['UPLOAD_FOLDER'] = config['upload_folder']
app.config['MAX_CONTENT_PATH'] = config['max_content_path']
# Get hostname and render clients
cls.host_name = socket.gethostname()
if not RenderServer.render_clients:
cls.render_clients = [RenderServer.host_name]
if not cls.client_mode:
logger.info(f"Starting Zordon in Server Mode - {cls.host_name}")
else:
logger.info(f"Starting Zordon in Client Mode - {cls.host_name}")
# disable most Flask logging
flask_log = logging.getLogger('werkzeug')
flask_log.setLevel(config.get('flask_log_level', 'ERROR').upper())
# Setup the RenderServer object
cls.load_history()
thread = threading.Thread(target=eval_loop, kwargs={'delay_sec': config.get('queue_eval_seconds', 1)}, daemon=True)
thread.start()
if background_thread:
server_thread = threading.Thread(
target=lambda: app.run(host=cls.host_name, port=cls.port, debug=False, use_reloader=False))
server_thread.start()
server_thread.join()
else:
app.run(host=cls.host_name, port=cls.port, debug=config.get('flask_debug_enable', False),
use_reloader=False)
@app.get('/jobs') @app.get('/jobs')
def jobs_json(): def jobs_json():
@@ -290,33 +353,51 @@ def jobs_json():
def filtered_jobs_json(status_val): def filtered_jobs_json(status_val):
state = string_to_status(status_val) state = string_to_status(status_val)
jobs = [json.loads(x.json()) for x in RenderServer.jobs_with_status(state)] jobs = [json.loads(x.json()) for x in RenderServer.jobs_with_status(state)]
return jsonify(jobs) return jobs
@app.get('/job_status/<job_id>')
def get_job_status(job_id):
found_job = RenderServer.job_with_id(job_id)
if found_job:
return found_job.json()
else:
return None
@app.post('/register_client')
def register_client():
client_hostname = request.values['hostname']
x = RenderServer.register_client(client_hostname)
return "Success" if x else "Fail"
@app.get('/full_status') @app.get('/full_status')
def full_status(): def full_status():
full_results = {'timestamp': datetime.now().isoformat(), 'servers': {}} full_results = {'timestamp': datetime.now().isoformat(), 'servers': {}}
# todo: iterate through all servers
server_list = [socket.gethostname()]
try: try:
for server_hostname in server_list: for client_hostname in RenderServer.render_clients:
if server_hostname == local_hostname: if client_hostname == local_hostname:
server_status = RenderServer.status() snapshot_results = snapshot()
server_jobs = [json.loads(x.json()) for x in RenderServer.render_queue if not x.archived]
else: else:
server_status = requests.get(f'http://{server_hostname}:8080/status', timeout=1).json() snapshot_results = requests.get(f'http://{client_hostname}:8080/snapshot', timeout=1).json()
server_jobs = requests.get(f'http://{server_hostname}:8080/jobs', timeout=1).json() server_data = {'status': snapshot_results.get('status'), 'jobs': snapshot_results.get('jobs')}
server_data = {'status': server_status, 'jobs': server_jobs} full_results['servers'][client_hostname] = server_data
full_results['servers'][server_hostname] = server_data
except Exception as e: except Exception as e:
logger.error(f"Exception fetching full status: {e}") logger.error(f"Exception fetching full status: {e}")
return full_results return full_results
@app.get('/snapshot')
def snapshot():
server_status = RenderServer.status()
server_jobs = [json.loads(x.json()) for x in RenderServer.render_queue if not x.archived]
server_data = {'status': server_status, 'jobs': server_jobs, 'timestamp': datetime.now().isoformat()}
return server_data
@app.post('/add_job') @app.post('/add_job')
def add_job(): def add_job():
"""Create new job and add to server render queue""" """Create new job and add to server render queue"""
@@ -327,16 +408,6 @@ def add_job():
args = request.json.get('args', None) args = request.json.get('args', None)
force_start = request.json.get('force_start', False) force_start = request.json.get('force_start', False)
return add_job_handler(renderer, input_path, output_path, args=args, priority=priority, force_start=force_start)
def add_job_handler(renderer, input_path, output_path, args=None, priority=2, force_start=False):
if not os.path.exists(input_path):
err_msg = f"Cannot add job. Cannot find input file: {input_path}"
logger.error(err_msg)
return {"error": err_msg}, 400
try: try:
render_job = RenderWorkerFactory.create_worker(renderer, input_path, output_path, args) render_job = RenderWorkerFactory.create_worker(renderer, input_path, output_path, args)
except ValueError as e: except ValueError as e:
@@ -399,25 +470,38 @@ def upload_file():
if not uploaded_file.filename: if not uploaded_file.filename:
return {'error': 'no file uploaded'} return {'error': 'no file uploaded'}
with open('config.yaml') as f: # generate directory to use
config = yaml.load(f, Loader=yaml.FullLoader) logger.debug(f"Receiving uploaded file {uploaded_file.filename}")
new_id = RenderJob.generate_id()
job_dir = os.path.join(app.config['UPLOAD_FOLDER'], new_id + "-" + uploaded_file.filename)
if not os.path.exists(job_dir):
os.makedirs(job_dir)
# disable most Flask logging local_path = os.path.join(job_dir, secure_filename(uploaded_file.filename))
flask_log = logging.getLogger('werkzeug') uploaded_file.save(local_path)
flask_log.setLevel(logging.ERROR) renderer = request.values['renderer']
logging.basicConfig(format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S', level=logging.INFO) # todo: finish output_path and args - currently placeholder data
output_path = os.path.join(job_dir, uploaded_file.filename + "-output.mp4")
# Setup the RenderServer object if renderer == 'blender':
RenderServer.load_history() args = {'engine': request.values['blender_engine']}
RenderServer.evaluate_queue() else:
args = None
thread = threading.Thread(target=eval_loop, daemon=True) try:
thread.start() render_job = RenderWorkerFactory.create_worker(renderer, local_path, output_path, args=args)
server_thread = threading.Thread(target=lambda: app.run(host=RenderServer.host_name, port=RenderServer.port, debug=False, use_reloader=False)) new_job = RenderJob(render_job, custom_id=new_id)
server_thread.start() RenderServer.add_to_render_queue(new_job)
if not background_thread: return new_job.json()
server_thread.join() except ValueError as e:
logger.exception(e)
return {'error': str(e)}, 400
except Exception as e:
logger.exception(e)
return {'error': 'unknown error'}
if __name__ == '__main__': if __name__ == '__main__':