Files
Zordon/zordon_server.py
2022-10-11 21:41:12 -07:00

546 lines
19 KiB
Python
Executable File

#!/usr/bin/env python
import argparse
import json
import logging
import os
import platform
import socket
import threading
import time
import uuid
from datetime import datetime
import psutil
import requests
import yaml
from flask import Flask, jsonify, request, render_template
from werkzeug.utils import secure_filename
from utilities.render_worker import RenderWorkerFactory, RenderStatus, string_to_status
app = Flask(__name__)
logger = logging.getLogger()
local_hostname = socket.gethostname()
JSON_FILE = 'server_state.json'
#todo: move history to sqlite db
class RenderJob:
def __init__(self, render, priority=2, owner=None, client=None, notify=None, custom_id=None):
self.id = custom_id or self.generate_id()
self.owner = owner
self.render = render
self.priority = priority
self.client = client
self.notify = notify
self.date_created = datetime.now()
self.scheduled_start = None
self.renderer = render.renderer
self.name = os.path.basename(render.input) + '_' + self.date_created.isoformat()
self.archived = False
def render_status(self):
"""Returns status of render job"""
try:
if self.scheduled_start and self.render.status == RenderStatus.NOT_STARTED:
return RenderStatus.SCHEDULED
else:
return self.render.status
except Exception as e:
logger.warning("render_status error: {}".format(e))
return RenderStatus.ERROR
def json(self):
"""Converts RenderJob into JSON format"""
import numbers
def date_serializer(o):
if isinstance(o, datetime):
return o.isoformat()
json_string = ''
try:
d = self.__dict__.copy()
d['status'] = self.render_status().value
d['render'] = self.render.__dict__.copy()
for key in ['thread', 'process']: # remove unwanted keys from JSON
d['render'].pop(key, None)
d['render']['status'] = d['status']
# jobs from current_session generate percent completed
# jobs after loading server pull in a saved value. Have to check if callable object or not
percent_complete = self.render.percent_complete if isinstance(self.render.percent_complete, numbers.Number) \
else self.render.percent_complete()
d['render']['percent_complete'] = percent_complete
json_string = json.dumps(d, default=date_serializer)
except Exception as e:
logger.error("Error converting to JSON: {}".format(e))
return json_string
@classmethod
def generate_id(cls):
return str(uuid.uuid4()).split('-')[0]
class RenderServer:
render_queue = []
render_clients = []
maximum_renderer_instances = {'Blender': 2, 'After Effects': 1, 'ffmpeg': 4}
host_name = None
port = 8080
client_mode = False
server_hostname = None
last_saved_counts = {}
def __init__(self):
pass
@classmethod
def add_to_render_queue(cls, render_job, force_start=False, client=None):
if not client or render_job.client == cls.host_name:
logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job.render))
render_job.client = cls.host_name
cls.render_queue.append(render_job)
if force_start:
cls.start_job(render_job)
else:
cls.evaluate_queue()
else:
# todo: implement client rendering
logger.warning('remote client rendering not implemented yet')
@classmethod
def running_jobs(cls):
return cls.jobs_with_status(RenderStatus.RUNNING)
@classmethod
def pending_jobs(cls):
pending_jobs = cls.jobs_with_status(RenderStatus.NOT_STARTED)
pending_jobs.extend(cls.jobs_with_status(RenderStatus.SCHEDULED))
return pending_jobs
@classmethod
def jobs_with_status(cls, status, priority_sorted=False, include_archived=True):
found_jobs = [x for x in cls.render_queue if x.render_status() == status]
if not include_archived:
found_jobs = [x for x in found_jobs if not x.archived]
if priority_sorted:
found_jobs = sorted(found_jobs, key=lambda a: a.priority, reverse=False)
return found_jobs
@classmethod
def job_with_id(cls, job_id):
found_job = next((x for x in cls.render_queue if x.id == job_id), None)
return found_job
@classmethod
def clear_history(cls):
to_remove = [x for x in cls.render_queue if x.render_status() in [RenderStatus.CANCELLED,
RenderStatus.COMPLETED, RenderStatus.ERROR]]
for x in to_remove:
x.archived = True
cls.save_state()
@classmethod
def load_state(cls, json_path=None):
"""Load state history from JSON file"""
input_path = json_path or JSON_FILE
if os.path.exists(input_path):
with open(input_path) as f:
# load saved data
saved_state = json.load(f)
cls.render_clients = saved_state.get('clients', {})
for job in saved_state.get('jobs', []):
# Identify renderer type and recreate Renderer object
job_render_object = RenderWorkerFactory.create_worker(job['renderer'], input_path=job['render']['input'], output_path=job['render']['output'])
# Load Renderer values
for key, val in job['render'].items():
if val and key in ['start_time', 'end_time']: # convert date strings back into date objects
job_render_object.__dict__[key] = datetime.fromisoformat(val)
else:
job_render_object.__dict__[key] = val
job_render_object.status = RenderStatus[job['status'].upper()]
job.pop('render', None)
# Create RenderJob with re-created Renderer object
new_job = RenderJob(job_render_object, job['priority'], job['client'])
for key, val in job.items():
if key in ['date_created']: # convert date strings back to datetime objects
new_job.__dict__[key] = datetime.fromisoformat(val)
else:
new_job.__dict__[key] = val
new_job.__delattr__('status')
# Handle older loaded jobs that were cancelled before closing
if new_job.render_status() == RenderStatus.RUNNING:
new_job.render.status = RenderStatus.CANCELLED
# finally add back to render queue
cls.render_queue.append(new_job)
cls.last_saved_counts = cls.job_counts()
@classmethod
def save_state(cls, json_path=None):
"""Save state history to JSON file"""
try:
logger.debug("Saving Render History")
output = {'timestamp': datetime.now().isoformat(),
'jobs': [json.loads(j.json()) for j in cls.render_queue],
'clients': cls.render_clients}
output_path = json_path or JSON_FILE
with open(output_path, 'w') as f:
json.dump(output, f, indent=4)
cls.last_saved_counts = cls.job_counts()
except Exception as e:
logger.error("Error saving state JSON: {}".format(e))
@classmethod
def evaluate_queue(cls):
instances = cls.renderer_instances()
not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True)
if not_started:
for job in not_started:
renderer = job.render.renderer
higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < job.priority]
max_renderers = renderer in instances.keys() and instances[
renderer] >= cls.maximum_renderer_instances.get(renderer, 1)
if not max_renderers and not higher_priority_jobs:
cls.start_job(job)
scheduled = cls.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True)
for job in scheduled:
if job.scheduled_start <= datetime.now():
cls.start_job(job)
if cls.last_saved_counts != cls.job_counts():
cls.save_state()
@classmethod
def start_job(cls, job):
logger.info('Starting {}render: {} - Priority {}'.format('scheduled ' if job.scheduled_start else '', job.name,
job.priority))
job.render.start()
@classmethod
def cancel_job(cls, job):
logger.info('Cancelling job ID: {}'.format(job.id))
if job.render_status() in [RenderStatus.NOT_STARTED, RenderStatus.RUNNING, RenderStatus.ERROR]:
job.render.stop()
job.render.status = RenderStatus.CANCELLED
return True
return False
@classmethod
def renderer_instances(cls):
from collections import Counter
all_instances = [x.render.renderer for x in cls.running_jobs()]
return Counter(all_instances)
@classmethod
def job_counts(cls):
job_counts = {}
for job_status in RenderStatus:
job_counts[job_status.value] = len(RenderServer.jobs_with_status(job_status))
return job_counts
@classmethod
def status(cls):
stats = {"timestamp": datetime.now().isoformat(),
"platform": platform.platform(),
"cpu_percent": psutil.cpu_percent(percpu=False),
"cpu_percent_per_cpu": psutil.cpu_percent(percpu=True),
"cpu_count": psutil.cpu_count(),
"memory_total": psutil.virtual_memory().total,
"memory_available": psutil.virtual_memory().available,
"memory_percent": psutil.virtual_memory().percent,
"job_counts": RenderServer.job_counts(),
"host_name": RenderServer.host_name
}
return stats
@classmethod
def all_jobs(cls):
all_jobs = [x for x in RenderServer.render_queue if not x.archived]
return all_jobs
@classmethod
def register_client(cls, hostname):
success = False
if hostname in cls.render_clients:
logger.warning(f"Client '{hostname}' already registered")
return success
try:
response = requests.get(f"http://{hostname}:8080/status", timeout=1)
if response.ok:
cls.render_clients.append(hostname)
logger.info(f"Client '{hostname}' successfully registered")
success = True
cls.save_state()
except requests.ConnectionError as e:
logger.error(f"Cannot connect to client at hostname: {hostname}")
return success
@classmethod
def unregister_client(cls, hostname):
success = False
if hostname in cls.render_clients and hostname != cls.host_name:
cls.render_clients.remove(hostname)
logger.info(f"Client '{hostname}' successfully unregistered")
success = True
return success
@classmethod
def start(cls, background_thread=False):
def eval_loop(delay_sec=1):
while True:
cls.evaluate_queue()
time.sleep(delay_sec)
logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S',
level=logging.INFO)
with open('config.yaml') as f:
config = yaml.load(f, Loader=yaml.FullLoader)
app.config['UPLOAD_FOLDER'] = config['upload_folder']
app.config['MAX_CONTENT_PATH'] = config['max_content_path']
app.config['RESULT_STATIC_PATH'] = 'static/'
# Get hostname and render clients
cls.host_name = socket.gethostname()
if not RenderServer.render_clients:
cls.render_clients = [RenderServer.host_name]
if not cls.client_mode:
logger.info(f"Starting Zordon in Server Mode - {cls.host_name}")
else:
logger.info(f"Starting Zordon in Client Mode - {cls.host_name}")
# disable most Flask logging
flask_log = logging.getLogger('werkzeug')
flask_log.setLevel(config.get('flask_log_level', 'ERROR').upper())
# Setup the RenderServer object
cls.load_state()
thread = threading.Thread(target=eval_loop, kwargs={'delay_sec': config.get('queue_eval_seconds', 1)}, daemon=True)
thread.start()
if background_thread:
server_thread = threading.Thread(
target=lambda: app.run(host='0.0.0.0', port=cls.port, debug=False, use_reloader=False))
server_thread.start()
server_thread.join()
else:
app.run(host='0.0.0.0', port=cls.port, debug=config.get('flask_debug_enable', False),
use_reloader=False)
@app.get('/jobs')
def jobs_json():
return [json.loads(x.json()) for x in RenderServer.render_queue if not x.archived]
@app.get('/jobs/<status_val>')
def filtered_jobs_json(status_val):
state = string_to_status(status_val)
jobs = [json.loads(x.json()) for x in RenderServer.jobs_with_status(state)]
return jobs
@app.get('/job_status/<job_id>')
def get_job_status(job_id):
found_job = RenderServer.job_with_id(job_id)
if found_job:
return found_job.json()
else:
return None
@app.post('/register_client')
def register_client():
client_hostname = request.values['hostname']
x = RenderServer.register_client(client_hostname)
return "Success" if x else "Fail"
@app.post('/unregister_client')
def unregister_client():
client_hostname = request.values['hostname']
x = RenderServer.unregister_client(client_hostname)
return "Success" if x else "Fail"
@app.get('/full_status')
def full_status():
full_results = {'timestamp': datetime.now().isoformat(), 'servers': {}}
try:
for client_hostname in RenderServer.render_clients:
is_online = False
if client_hostname == local_hostname:
snapshot_results = snapshot()
is_online = True
else:
snapshot_results = {}
try:
snapshot_request = requests.get(f'http://{client_hostname}:8080/snapshot', timeout=1)
snapshot_results = snapshot_request.json()
is_online = snapshot_request.ok
except requests.ConnectionError as e:
pass
server_data = {'status': snapshot_results.get('status', {}), 'jobs': snapshot_results.get('jobs', {}),
'is_online': is_online}
full_results['servers'][client_hostname] = server_data
except Exception as e:
logger.error(f"Exception fetching full status: {e}")
return full_results
@app.get('/snapshot')
def snapshot():
server_status = RenderServer.status()
server_jobs = [json.loads(x.json()) for x in RenderServer.render_queue if not x.archived]
server_data = {'status': server_status, 'jobs': server_jobs, 'timestamp': datetime.now().isoformat()}
return server_data
@app.post('/add_job')
def add_job():
"""Create new job and add to server render queue"""
renderer = request.json.get("renderer", None)
input_path = request.json.get("input", None)
output_path = request.json.get("output", None)
priority = request.json.get('priority', 2)
args = request.json.get('args', None)
force_start = request.json.get('force_start', False)
if None in [renderer, input_path, output_path]:
return {'error': 'missing required parameters'}, 400
try:
render_job = RenderWorkerFactory.create_worker(renderer, input_path, output_path, args)
except ValueError as e:
logger.exception(e)
return {'error': str(e)}, 400
new_job = RenderJob(render_job, priority=priority)
RenderServer.add_to_render_queue(new_job, force_start=force_start)
return new_job.json()
@app.get('/cancel_job')
def cancel_job():
job_id = request.args.get('id', None)
confirm = request.args.get('confirm', False)
if not job_id:
return {'error': 'job id not found'}, 400
elif not confirm:
return {'error': 'confirmation required'}, 400
else:
found = [x for x in RenderServer.render_queue if x.id == job_id]
if len(found) > 1:
# logger.error('Multiple jobs found for ID {}'.format(job_id))
return jsonify({'error': 'multiple jobs found for ID {}'.format(job_id)})
elif found:
success = RenderServer.cancel_job(found[0])
return jsonify({'result': success})
return {'error': 'job not found'}, 400
@app.get('/clear_history')
def clear_history():
RenderServer.clear_history()
return {'result': True}
@app.route('/status')
def status():
return RenderServer.status()
@app.route('/')
def default():
return "Server running"
@app.route('/upload')
def upload_file_page():
return render_template('upload.html')
@app.route('/uploader', methods=['GET', 'POST'])
def upload_file():
if request.method == 'POST':
try:
uploaded_file = request.files['file']
if not uploaded_file.filename:
return {'error': 'no file uploaded'}
# generate directory to use
logger.debug(f"Receiving uploaded file {uploaded_file.filename}")
new_id = RenderJob.generate_id()
job_dir = os.path.join(app.config['UPLOAD_FOLDER'], new_id + "-" + uploaded_file.filename)
if not os.path.exists(job_dir):
os.makedirs(job_dir)
local_path = os.path.join(job_dir, secure_filename(uploaded_file.filename))
uploaded_file.save(local_path)
renderer = request.values['renderer']
# todo: finish output_path and args - currently placeholder data
output_path = os.path.join(job_dir, uploaded_file.filename + "-output.mp4")
if renderer == 'blender':
args = {'engine': request.values['blender_engine']}
else:
args = None
try:
render_job = RenderWorkerFactory.create_worker(renderer, local_path, output_path, args=args)
new_job = RenderJob(render_job, custom_id=new_id)
RenderServer.add_to_render_queue(new_job)
return new_job.json()
except ValueError as e:
logger.exception(e)
return {'error': str(e)}, 400
except Exception as e:
logger.exception(e)
return {'error': 'unknown error'}
if __name__ == '__main__':
RenderServer.start()