Initial commit

This commit is contained in:
Brett Williams
2022-10-04 23:09:09 -07:00
commit 1c304e77f5
13 changed files with 1717 additions and 0 deletions

425
zordon_server.py Executable file
View File

@@ -0,0 +1,425 @@
#!/usr/bin/env python
import logging
import platform
import socket
import threading
import time
import uuid
from datetime import datetime
import psutil
import requests
import yaml
import json
import os
from flask import Flask, jsonify, request
from utilities.aerender import AERenderer
from utilities.blender import BlenderRenderer
from utilities.ffmpeg_render import FFMPEGRenderer
from utilities.generic_renderer import RenderStatus
from utilities.generic_renderer import string_to_status
data = 'foo'
app = Flask(__name__)
logger = logging.getLogger()
local_hostname = socket.gethostname()
JSON_FILE = 'job_history.json'
#todo: move history to sqlite db
class RenderJob:
def __init__(self, render, priority, owner=None, client=None, notify=None):
self.id = str(uuid.uuid4()).split('-')[0]
self.owner = owner
self.render = render
self.priority = priority
self.client = client
self.notify = notify
self.date_created = datetime.now()
self.scheduled_start = None
self.renderer = render.renderer
self.name = os.path.basename(render.input) + '_' + self.date_created.isoformat()
self.archived = False
def render_status(self):
"""Returns status of render job"""
try:
if self.scheduled_start and self.render.status == RenderStatus.NOT_STARTED:
return RenderStatus.SCHEDULED
else:
return self.render.status
except Exception as e:
logger.warning("render_status error: {}".format(e))
return RenderStatus.ERROR
def json(self):
"""Converts RenderJob into JSON format"""
import numbers
def date_serializer(o):
if isinstance(o, datetime):
return o.isoformat()
json_string = ''
try:
d = self.__dict__.copy()
d['status'] = self.render_status().value
d['render'] = self.render.__dict__.copy()
for key in ['thread', 'process']: # remove unwanted keys from JSON
d['render'].pop(key, None)
d['render']['status'] = d['status']
# jobs from current_session generate percent completed
# jobs after loading server pull in a saved value. Have to check if callable object or not
percent_complete = self.render.percent_complete if isinstance(self.render.percent_complete, numbers.Number) \
else self.render.percent_complete()
d['render']['percent_complete'] = percent_complete
json_string = json.dumps(d, default=date_serializer)
except Exception as e:
logger.error("Error converting to JSON: {}".format(e))
return json_string
def render_factory(input_path, output_path):
if '.blend' in input_path.lower():
return BlenderRenderer(input_path, output_path)
class RenderServer:
render_queue = []
render_clients = []
maximum_renderer_instances = {'Blender': 2, 'After Effects': 1, 'ffmpeg': 4}
host_name = socket.gethostname()
port = 8080
last_saved_counts = {}
def __init__(self):
pass
@classmethod
def add_to_render_queue(cls, render_job, force_start=False):
if not render_job.client:
logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job.render))
cls.render_queue.append(render_job)
if force_start:
cls.start_job(render_job)
else:
cls.evaluate_queue()
else:
# todo: implement client rendering
logger.warning('remote client rendering not implemented yet')
@classmethod
def running_jobs(cls):
return cls.jobs_with_status(RenderStatus.RUNNING)
@classmethod
def pending_jobs(cls):
pending_jobs = cls.jobs_with_status(RenderStatus.NOT_STARTED)
pending_jobs.extend(cls.jobs_with_status(RenderStatus.SCHEDULED))
return pending_jobs
@classmethod
def jobs_with_status(cls, status, priority_sorted=False, include_archived=True):
found_jobs = [x for x in cls.render_queue if x.render_status() == status]
if not include_archived:
found_jobs = [x for x in found_jobs if not x.archived]
if priority_sorted:
found_jobs = sorted(found_jobs, key=lambda a: a.priority, reverse=False)
return found_jobs
@classmethod
def clear_history(cls):
to_remove = [x for x in cls.render_queue if x.render_status() in [RenderStatus.CANCELLED,
RenderStatus.COMPLETED, RenderStatus.ERROR]]
for x in to_remove:
x.archived = True
@classmethod
def load_history(cls, json_path=None):
input_path = json_path or JSON_FILE
if os.path.exists(input_path):
f = open(input_path)
job_list = json.load(f)
for job in job_list:
# Identify renderer type and recreate Renderer object
# TODO: refactor to factory class
job_render_object = None
if job['renderer'] == 'Blender':
job_render_object = BlenderRenderer(job['render']['input'], job['render']['output'])
elif job['renderer'] == 'After Effects':
AERenderer()
elif job['renderer'] == 'ffmpeg':
job_render_object = FFMPEGRenderer(job['render']['input'], job['render']['output'])
# Load Renderer values
for key, val in job['render'].items():
if val and key in ['start_time', 'end_time']: # convert date strings back into date objects
job_render_object.__dict__[key] = datetime.fromisoformat(val)
else:
job_render_object.__dict__[key] = val
job_render_object.status = RenderStatus[job['status'].upper()]
job.pop('render', None)
# Create RenderJob with re-created Renderer object
new_job = RenderJob(job_render_object, job['priority'], job['client'])
for key, val in job.items():
if key in ['date_created']: # convert date strings back to datetime objects
new_job.__dict__[key] = datetime.fromisoformat(val)
else:
new_job.__dict__[key] = val
new_job.__delattr__('status')
# Handle older loaded jobs that were cancelled before closing
if new_job.render_status() == RenderStatus.RUNNING:
new_job.render.status = RenderStatus.CANCELLED
# finally add back to render queue
cls.render_queue.append(new_job)
f.close()
cls.last_saved_counts = cls.job_counts()
@classmethod
def save_history(cls, json_path=None):
"""Save job history to JSON file"""
try:
logger.debug("Saving Render History")
new_list = []
for job in cls.render_queue:
new_list.append(json.loads(job.json()))
output_path = json_path or JSON_FILE
with open(output_path, 'w') as f:
json.dump(new_list, f, indent=4)
cls.last_saved_counts = cls.job_counts()
except Exception as e:
logger.error("Error saving jobs JSON: {}".format(e))
@classmethod
def evaluate_queue(cls):
instances = cls.renderer_instances()
not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True)
if not_started:
for job in not_started:
renderer = job.render.renderer
higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < job.priority]
max_renderers = renderer in instances.keys() and instances[
renderer] >= cls.maximum_renderer_instances.get(renderer, 1)
if not max_renderers and not higher_priority_jobs:
cls.start_job(job)
scheduled = cls.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True)
for job in scheduled:
if job.scheduled_start <= datetime.now():
cls.start_job(job)
if cls.last_saved_counts != cls.job_counts():
cls.save_history()
@classmethod
def start_job(cls, job):
logger.info('Starting {}render: {} - Priority {}'.format('scheduled ' if job.scheduled_start else '', job.name,
job.priority))
job.render.start()
@classmethod
def cancel_job(cls, job):
logger.info('Cancelling job ID: {}'.format(job.id))
if job.render_status() in [RenderStatus.NOT_STARTED, RenderStatus.RUNNING, RenderStatus.ERROR]:
job.render.stop()
job.render.status = RenderStatus.CANCELLED
return True
return False
@classmethod
def renderer_instances(cls):
from collections import Counter
all_instances = [x.render.renderer for x in cls.running_jobs()]
return Counter(all_instances)
@classmethod
def job_counts(cls):
job_counts = {}
for job_status in RenderStatus:
job_counts[job_status.value] = len(RenderServer.jobs_with_status(job_status))
return job_counts
@classmethod
def status(cls):
stats = {"timestamp": datetime.now().isoformat(),
"platform": platform.platform(),
"cpu_percent": psutil.cpu_percent(percpu=False),
"cpu_count": psutil.cpu_count(),
"memory_total": psutil.virtual_memory().total,
"memory_available": psutil.virtual_memory().available,
"memory_percent": psutil.virtual_memory().percent,
"job_counts": RenderServer.job_counts(),
"host_name": RenderServer.host_name
}
return stats
@classmethod
def all_jobs(cls):
all_jobs = [x for x in RenderServer.render_queue if not x.archived]
return all_jobs
@classmethod
def register_client(cls, hostname):
# todo: register this
response = requests.get("https://{}/_register_".format(hostname))
print(response.status_code)
pass
@classmethod
def unregister_client(cls):
pass
@app.get('/jobs')
def jobs_json():
return jsonify([json.loads(x.json()) for x in RenderServer.render_queue if not x.archived])
@app.get('/jobs/<status_val>')
def filtered_jobs_json(status_val):
state = string_to_status(status_val)
jobs = [json.loads(x.json()) for x in RenderServer.jobs_with_status(state)]
return jsonify(jobs)
@app.get('/full_status')
def full_status():
full_results = {'timestamp': datetime.now().isoformat(), 'servers': {}}
# todo: iterate through all servers
server_list = [socket.gethostname()]
try:
for server_hostname in server_list:
if server_hostname == local_hostname:
server_status = RenderServer.status()
server_jobs = [json.loads(x.json()) for x in RenderServer.render_queue if not x.archived]
else:
server_status = requests.get(f'http://{server_hostname}:8080/status', timeout=1).json()
server_jobs = requests.get(f'http://{server_hostname}:8080/jobs', timeout=1).json()
server_data = {'status': server_status, 'jobs': server_jobs}
full_results['servers'][server_hostname] = server_data
except Exception as e:
logger.error(f"Exception fetching full status: {e}")
return full_results
@app.post('/add_job')
def add_job():
"""Create new job and add to server render queue"""
renderer = request.json["renderer"]
input_path = request.json["input"]
output_path = request.json["output"]
if not os.path.exists(input_path):
err_msg = f"Cannot add job. Cannot find input file: {input_path}"
logger.error(err_msg)
return {"error": err_msg}, 400
# todo: create factory class for creating renderers
if "blender" in renderer:
render_job = BlenderRenderer(input_path, output_path)
render_job.engine = request.json.get('engine', 'BLENDER_EEVEE')
elif "aerender" in renderer:
render_job = AERenderer(input_path, output_path)
elif "ffmpeg" in renderer:
render_job = FFMPEGRenderer(input_path, output_path, args=request.json.get('args', None))
else:
err_msg = "Unknown renderer: {}".format(renderer)
logger.error(err_msg)
return {'error': err_msg}, 400
new_job = RenderJob(render_job, priority=request.json.get('priority', 2))
RenderServer.add_to_render_queue(new_job, force_start=request.json.get('force_start', False))
return new_job.json()
@app.get('/cancel_job')
def cancel_job():
job_id = request.args.get('id', None)
confirm = request.args.get('confirm', False)
if not job_id:
return jsonify({'error': 'job id not found'})
elif not confirm:
return jsonify({'error': 'confirmation required'})
else:
found = [x for x in RenderServer.render_queue if x.id == job_id]
if len(found) > 1:
# logger.error('Multiple jobs found for ID {}'.format(job_id))
return jsonify({'error': 'multiple jobs found for ID {}'.format(job_id)})
elif found:
success = RenderServer.cancel_job(found[0])
return jsonify({'result': success})
return jsonify({'error': 'job not found'})
@app.get('/clear_history')
def clear_history():
RenderServer.clear_history()
return jsonify({'result': True})
@app.route('/status')
def status():
return jsonify(RenderServer.status())
@app.route('/')
def default():
return "Server running"
def start_server(background_thread=False):
def eval_loop():
while True:
RenderServer.evaluate_queue()
time.sleep(1)
with open('config.yaml') as f:
config = yaml.load(f, Loader=yaml.FullLoader)
# disable most Flask logging
flask_log = logging.getLogger('werkzeug')
flask_log.setLevel(logging.ERROR)
logging.basicConfig(format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S', level=logging.INFO)
# Setup the RenderServer object
RenderServer.load_history()
RenderServer.evaluate_queue()
thread = threading.Thread(target=eval_loop, daemon=True)
thread.start()
server_thread = threading.Thread(target=lambda: app.run(host=RenderServer.host_name, port=RenderServer.port, debug=False, use_reloader=False))
server_thread.start()
if not background_thread:
server_thread.join()
if __name__ == '__main__':
start_server()