Refactor and reorganize code. Split server into manager and server files.

This commit is contained in:
Brett Williams
2022-10-12 13:44:41 -07:00
parent 8ae50e2431
commit fd6af10d56
8 changed files with 630 additions and 631 deletions

View File

@@ -17,8 +17,8 @@ from rich.table import Table
from rich.text import Text
from rich.tree import Tree
import zordon_server
from zordon_server import RenderStatus, string_to_status
from utilities.render_worker import RenderStatus, string_to_status
from zordon_server import start_server
"""
The RenderDashboard is designed to be run on a remote machine or on the local server
@@ -235,10 +235,10 @@ if __name__ == '__main__':
if not client.connect():
if client.server_ip == local_hostname:
start_server = input("Local server not running. Start server? (y/n) ")
if start_server and start_server[0].lower() == "y":
start_server_input = input("Local server not running. Start server? (y/n) ")
if start_server_input and start_server_input[0].lower() == "y":
# Startup the local server
zordon_server.RenderServer.start(background_thread=True)
start_server(background_thread=True)
test = client.connect()
print(f"connected? {test}")
else:

0
lib/__init__.py Normal file
View File

70
lib/render_job.py Normal file
View File

@@ -0,0 +1,70 @@
import json
import logging
import os
import uuid
from datetime import datetime
from utilities.render_worker import RenderStatus
logger = logging.getLogger()
class RenderJob:
def __init__(self, render, priority=2, owner=None, client=None, notify=None, custom_id=None):
self.id = custom_id or self.generate_id()
self.owner = owner
self.render = render
self.priority = priority
self.client = client
self.notify = notify
self.date_created = datetime.now()
self.scheduled_start = None
self.renderer = render.renderer
self.name = os.path.basename(render.input) + '_' + self.date_created.isoformat()
self.archived = False
def render_status(self):
"""Returns status of render job"""
try:
if self.scheduled_start and self.render.status == RenderStatus.NOT_STARTED:
return RenderStatus.SCHEDULED
else:
return self.render.status
except Exception as e:
logger.warning("render_status error: {}".format(e))
return RenderStatus.ERROR
def json(self):
"""Converts RenderJob into JSON format"""
import numbers
def date_serializer(o):
if isinstance(o, datetime):
return o.isoformat()
json_string = ''
try:
d = self.__dict__.copy()
d['status'] = self.render_status().value
d['render'] = self.render.__dict__.copy()
for key in ['thread', 'process']: # remove unwanted keys from JSON
d['render'].pop(key, None)
d['render']['status'] = d['status']
# jobs from current_session generate percent completed
# jobs after loading server pull in a saved value. Have to check if callable object or not
percent_complete = self.render.percent_complete if isinstance(self.render.percent_complete, numbers.Number) \
else self.render.percent_complete()
d['render']['percent_complete'] = percent_complete
json_string = json.dumps(d, default=date_serializer)
except Exception as e:
logger.error("Error converting to JSON: {}".format(e))
return json_string
@classmethod
def generate_id(cls):
return str(uuid.uuid4()).split('-')[0]

255
lib/render_manager.py Executable file
View File

@@ -0,0 +1,255 @@
import json
import logging
import os
import platform
from datetime import datetime
import psutil
import requests
from lib.render_job import RenderJob
from utilities.render_worker import RenderWorkerFactory, RenderStatus
logger = logging.getLogger()
JSON_FILE = 'server_state.json'
#todo: move history to sqlite db
class RenderManager:
render_queue = []
render_clients = []
maximum_renderer_instances = {'Blender': 2, 'After Effects': 1, 'ffmpeg': 4}
host_name = None
port = 8080
client_mode = False
server_hostname = None
last_saved_counts = {}
def __init__(self):
pass
@classmethod
def add_to_render_queue(cls, render_job, force_start=False, client=None):
if not client or render_job.client == cls.host_name:
logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job.render))
render_job.client = cls.host_name
cls.render_queue.append(render_job)
if force_start:
cls.start_job(render_job)
else:
cls.evaluate_queue()
else:
# todo: implement client rendering
logger.warning('remote client rendering not implemented yet')
@classmethod
def running_jobs(cls):
return cls.jobs_with_status(RenderStatus.RUNNING)
@classmethod
def pending_jobs(cls):
pending_jobs = cls.jobs_with_status(RenderStatus.NOT_STARTED)
pending_jobs.extend(cls.jobs_with_status(RenderStatus.SCHEDULED))
return pending_jobs
@classmethod
def jobs_with_status(cls, status, priority_sorted=False, include_archived=True):
found_jobs = [x for x in cls.render_queue if x.render_status() == status]
if not include_archived:
found_jobs = [x for x in found_jobs if not x.archived]
if priority_sorted:
found_jobs = sorted(found_jobs, key=lambda a: a.priority, reverse=False)
return found_jobs
@classmethod
def job_with_id(cls, job_id):
found_job = next((x for x in cls.render_queue if x.id == job_id), None)
return found_job
@classmethod
def clear_history(cls):
to_remove = [x for x in cls.render_queue if x.render_status() in [RenderStatus.CANCELLED,
RenderStatus.COMPLETED, RenderStatus.ERROR]]
for x in to_remove:
x.archived = True
cls.save_state()
@classmethod
def load_state(cls, json_path=None):
"""Load state history from JSON file"""
input_path = json_path or JSON_FILE
if os.path.exists(input_path):
with open(input_path) as f:
# load saved data
saved_state = json.load(f)
cls.render_clients = saved_state.get('clients', {})
for job in saved_state.get('jobs', []):
# Identify renderer type and recreate Renderer object
job_render_object = RenderWorkerFactory.create_worker(job['renderer'], input_path=job['render']['input'], output_path=job['render']['output'])
# Load Renderer values
for key, val in job['render'].items():
if val and key in ['start_time', 'end_time']: # convert date strings back into date objects
job_render_object.__dict__[key] = datetime.fromisoformat(val)
else:
job_render_object.__dict__[key] = val
job_render_object.status = RenderStatus[job['status'].upper()]
job.pop('render', None)
# Create RenderJob with re-created Renderer object
new_job = RenderJob(job_render_object, job['priority'], job['client'])
for key, val in job.items():
if key in ['date_created']: # convert date strings back to datetime objects
new_job.__dict__[key] = datetime.fromisoformat(val)
else:
new_job.__dict__[key] = val
new_job.__delattr__('status')
# Handle older loaded jobs that were cancelled before closing
if new_job.render_status() == RenderStatus.RUNNING:
new_job.render.status = RenderStatus.CANCELLED
# finally add back to render queue
cls.render_queue.append(new_job)
cls.last_saved_counts = cls.job_counts()
@classmethod
def save_state(cls, json_path=None):
"""Save state history to JSON file"""
try:
logger.debug("Saving Render History")
output = {'timestamp': datetime.now().isoformat(),
'jobs': [json.loads(j.json()) for j in cls.render_queue],
'clients': cls.render_clients}
output_path = json_path or JSON_FILE
with open(output_path, 'w') as f:
json.dump(output, f, indent=4)
cls.last_saved_counts = cls.job_counts()
except Exception as e:
logger.error("Error saving state JSON: {}".format(e))
@classmethod
def evaluate_queue(cls):
instances = cls.renderer_instances()
not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True)
if not_started:
for job in not_started:
renderer = job.render.renderer
higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < job.priority]
max_renderers = renderer in instances.keys() and instances[
renderer] >= cls.maximum_renderer_instances.get(renderer, 1)
if not max_renderers and not higher_priority_jobs:
cls.start_job(job)
scheduled = cls.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True)
for job in scheduled:
if job.scheduled_start <= datetime.now():
cls.start_job(job)
if cls.last_saved_counts != cls.job_counts():
cls.save_state()
@classmethod
def start_job(cls, job):
logger.info('Starting {}render: {} - Priority {}'.format('scheduled ' if job.scheduled_start else '', job.name,
job.priority))
job.render.start()
@classmethod
def cancel_job(cls, job):
logger.info('Cancelling job ID: {}'.format(job.id))
if job.render_status() in [RenderStatus.NOT_STARTED, RenderStatus.RUNNING, RenderStatus.ERROR]:
job.render.stop()
job.render.status = RenderStatus.CANCELLED
return True
return False
@classmethod
def renderer_instances(cls):
from collections import Counter
all_instances = [x.render.renderer for x in cls.running_jobs()]
return Counter(all_instances)
@classmethod
def job_counts(cls):
job_counts = {}
for job_status in RenderStatus:
job_counts[job_status.value] = len(cls.jobs_with_status(job_status))
return job_counts
@classmethod
def status(cls):
stats = {"timestamp": datetime.now().isoformat(),
"platform": platform.platform(),
"cpu_percent": psutil.cpu_percent(percpu=False),
"cpu_percent_per_cpu": psutil.cpu_percent(percpu=True),
"cpu_count": psutil.cpu_count(),
"memory_total": psutil.virtual_memory().total,
"memory_available": psutil.virtual_memory().available,
"memory_percent": psutil.virtual_memory().percent,
"job_counts": cls.job_counts(),
"host_name": cls.host_name
}
return stats
@classmethod
def all_jobs(cls):
all_jobs = [x for x in cls.render_queue if not x.archived]
return all_jobs
@classmethod
def register_client(cls, hostname):
#todo: check to make sure not adding ourselves
success = False
if hostname in cls.render_clients:
logger.warning(f"Client '{hostname}' already registered")
return success
try:
response = requests.get(f"http://{hostname}:8080/status", timeout=3)
if response.ok:
cls.render_clients.append(hostname)
logger.info(f"Client '{hostname}' successfully registered")
success = True
cls.save_state()
except requests.ConnectionError as e:
logger.error(f"Cannot connect to client at hostname: {hostname}")
return success
@classmethod
def unregister_client(cls, hostname):
success = False
if hostname in cls.render_clients and hostname != cls.host_name:
cls.render_clients.remove(hostname)
logger.info(f"Client '{hostname}' successfully unregistered")
success = True
return success
@staticmethod
def is_client_available(client_hostname, timeout=3):
try:
response = requests.get(f"http://{client_hostname}:8080/status", timeout=timeout)
if response.ok:
return True
except requests.ConnectionError as e:
pass
return False

293
server.py Executable file
View File

@@ -0,0 +1,293 @@
#!/usr/bin/env python3
import json
import logging
import os
import socket
import threading
import time
from datetime import datetime
import requests
import yaml
from flask import Flask, jsonify, request, render_template
from werkzeug.utils import secure_filename
from lib.render_job import RenderJob
from lib.render_manager import RenderManager
from utilities.render_worker import RenderWorkerFactory, string_to_status
logger = logging.getLogger()
app = Flask(__name__)
@app.get('/jobs')
def jobs_json():
return [json.loads(x.json()) for x in RenderManager.render_queue if not x.archived]
@app.get('/jobs/<status_val>')
def filtered_jobs_json(status_val):
state = string_to_status(status_val)
jobs = [json.loads(x.json()) for x in RenderManager.jobs_with_status(state)]
if jobs:
return jobs
else:
return {'error', f'Cannot find jobs with status {status_val}'}, 400
@app.get('/job_status/<job_id>')
def get_job_status(job_id):
found_job = RenderManager.job_with_id(job_id)
if found_job:
logger.info("Founbd jobs")
return found_job.json()
else:
return {'error': f'Cannot find job with ID {job_id}'}, 400
@app.post('/register_client')
def register_client():
client_hostname = request.values['hostname']
x = RenderManager.register_client(client_hostname)
return "Success" if x else "Fail"
@app.post('/unregister_client')
def unregister_client():
client_hostname = request.values['hostname']
x = RenderManager.unregister_client(client_hostname)
return "Success" if x else "Fail"
@app.get('/full_status')
def full_status():
full_results = {'timestamp': datetime.now().isoformat(), 'servers': {}}
try:
for client_hostname in RenderManager.render_clients:
is_online = False
if client_hostname == RenderManager.host_name:
snapshot_results = snapshot()
is_online = True
else:
snapshot_results = {}
try:
snapshot_request = requests.get(f'http://{client_hostname}:8080/snapshot', timeout=1)
snapshot_results = snapshot_request.json()
is_online = snapshot_request.ok
except requests.ConnectionError as e:
pass
server_data = {'status': snapshot_results.get('status', {}), 'jobs': snapshot_results.get('jobs', {}),
'is_online': is_online}
full_results['servers'][client_hostname] = server_data
except Exception as e:
logger.error(f"Exception fetching full status: {e}")
return full_results
@app.get('/snapshot')
def snapshot():
server_status = RenderManager.status()
server_jobs = [json.loads(x.json()) for x in RenderManager.render_queue if not x.archived]
server_data = {'status': server_status, 'jobs': server_jobs, 'timestamp': datetime.now().isoformat()}
return server_data
@app.post('/add_job')
def add_job():
"""Create new job and add to server render queue"""
renderer = request.json.get("renderer", None)
input_path = request.json.get("input", None)
output_path = request.json.get("output", None)
priority = request.json.get('priority', 2)
args = request.json.get('args', None)
client = request.json.get('client', RenderManager.host_name)
force_start = request.json.get('force_start', False)
if None in [renderer, input_path, output_path]:
return {'error': 'missing required parameters'}, 400
if client == RenderManager.host_name:
logger.info(f"Creating job locally - {input_path}")
# Local Renders
try:
render_job = RenderWorkerFactory.create_worker(renderer, input_path, output_path, args)
except ValueError as e:
logger.exception(e)
return {'error': str(e)}, 400
new_job = RenderJob(render_job, priority=priority)
RenderManager.add_to_render_queue(new_job, force_start=force_start)
return new_job.json()
elif client in RenderManager.render_clients:
# see if host is available
if RenderManager.is_client_available(client):
if args and renderer == 'blender' and args.get('pack_files', False):
from utilities.blender_worker import pack_blender_files
new_path = pack_blender_files(path=input_path)
if new_path:
logger.info(f'Packed Blender file successfully: {new_path}')
input_path = new_path
else:
err_msg = f'Failed to pack Blender file: {input_path}'
logger.error(err_msg)
return {'error': err_msg}, 400
# call uploader on remote client
try:
job_files = {'file': open(input_path, 'rb')}
job_data = request.json
job_data['input'] = input_path
logger.info(f"Uploading file {input_path} to client {client}")
response = requests.post(f"http://{client}:8080/uploader", files=job_files, data=job_data)
if response.ok:
logger.info("Job submitted successfully!")
return response.json() if response.json() else "Job ok"
else:
return {'error', 'Job rejected by client'}, 400
except requests.ConnectionError as e:
err_msg = f"Error submitting job to client: {client}"
logger.error(err_msg)
return {'error', err_msg}, 400
else:
# client is not available
err_msg = f"Render client '{client}' is unreachable"
logger.error(err_msg)
return {'error', err_msg}, 400
else:
err_msg = f"Unknown render client: '{client}'"
logger.error(err_msg)
return {'error', err_msg}, 400
@app.get('/cancel_job')
def cancel_job():
job_id = request.args.get('id', None)
confirm = request.args.get('confirm', False)
if not job_id:
return {'error': 'job id not found'}, 400
elif not confirm:
return {'error': 'confirmation required'}, 400
else:
found = [x for x in RenderManager.render_queue if x.id == job_id]
if len(found) > 1:
# logger.error('Multiple jobs found for ID {}'.format(job_id))
return jsonify({'error': 'multiple jobs found for ID {}'.format(job_id)})
elif found:
success = RenderManager.cancel_job(found[0])
return jsonify({'result': success})
return {'error': 'job not found'}, 400
@app.get('/clear_history')
def clear_history():
RenderManager.clear_history()
return {'result': True}
@app.route('/status')
def status():
return RenderManager.status()
@app.route('/')
def default():
return "Server running"
@app.route('/upload')
def upload_file_page():
return render_template('upload.html')
@app.route('/uploader', methods=['POST'])
def upload_file():
if request.method == 'POST':
try:
uploaded_file = request.files['file']
if not uploaded_file.filename:
return {'error': 'no file uploaded'}
# generate directory to use
logger.debug(f"Receiving uploaded file {uploaded_file.filename}")
new_id = RenderJob.generate_id()
job_dir = os.path.join(app.config['UPLOAD_FOLDER'], new_id + "-" + uploaded_file.filename)
if not os.path.exists(job_dir):
os.makedirs(job_dir)
local_path = os.path.join(job_dir, secure_filename(uploaded_file.filename))
uploaded_file.save(local_path)
renderer = request.values['renderer']
# todo: finish output_path - currently placeholder data
output_path = os.path.join(job_dir, uploaded_file.filename + "-output.mp4")
try:
render_job = RenderWorkerFactory.create_worker(renderer, local_path, output_path,
args=request.values.get('args', None))
new_job = RenderJob(render_job, custom_id=new_id)
RenderManager.add_to_render_queue(new_job)
return new_job.json()
except ValueError as e:
logger.exception(e)
return {'error': str(e)}, 400
except Exception as e:
logger.exception(e)
return {'error': 'unknown error'}
def start_server(background_thread=False):
def eval_loop(delay_sec=1):
while True:
RenderManager.evaluate_queue()
time.sleep(delay_sec)
logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S',
level=logging.INFO)
with open('config.yaml') as f:
config = yaml.load(f, Loader=yaml.FullLoader)
app.config['UPLOAD_FOLDER'] = config['upload_folder']
app.config['MAX_CONTENT_PATH'] = config['max_content_path']
# app.config['RESULT_STATIC_PATH'] = 'static'
# Get hostname and render clients
RenderManager.host_name = socket.gethostname()
app.config['HOSTNAME'] = RenderManager.host_name
if not RenderManager.render_clients:
RenderManager.render_clients = [RenderManager.host_name]
# disable most Flask logging
flask_log = logging.getLogger('werkzeug')
flask_log.setLevel(config.get('flask_log_level', 'ERROR').upper())
# Setup the RenderManager object
RenderManager.load_state()
thread = threading.Thread(target=eval_loop, kwargs={'delay_sec': config.get('queue_eval_seconds', 1)}, daemon=True)
thread.start()
if background_thread:
server_thread = threading.Thread(
target=lambda: app.run(host='0.0.0.0', port=RenderManager.port, debug=False, use_reloader=False))
server_thread.start()
server_thread.join()
else:
app.run(host='0.0.0.0', port=RenderManager.port, debug=config.get('flask_debug_enable', False),
use_reloader=False)
if __name__ == '__main__':
start_server()

View File

@@ -1,32 +1,9 @@
<html>
<body>
<h1>Upload a file</h1>
<form action="/uploader" method="POST"
enctype="multipart/form-data">
<div>
<input type="file" name="file"/><br>
</div>
<div>
<label for="renderer">Renderer:</label>
<select id="renderer" name="renderer" onchange="javascript:rendererCheck();">
<option value="aerender">After Effects</option>
<option value="blender">Blender</option>
<option value="ffmpeg">FFMPEG</option>
</select>
</div>
<br>
<div id="show_blender">
Blender Engine:
<input type="radio" id="cycles" name="blender_engine" value="CYCLES" checked>
<label for="cycles">Cycles</label>
<input type="radio" id="eevee" name="blender_engine" value="BLENDER_EEVEE">
<label for="eevee">Eevee</label>
</div>
<br>
<input type="submit"/>
</form>
</body>
<body>
<form action = "http://localhost:5000/uploader" method = "POST"
enctype = "multipart/form-data">
<input type = "file" name = "file" />
<input type = "submit"/>
</form>
</body>
</html>

0
utilities/__init__.py Normal file
View File

View File

@@ -1,596 +0,0 @@
#!/usr/bin/env python
import argparse
import json
import logging
import os
import platform
import socket
import threading
import time
import uuid
from datetime import datetime
import psutil
import requests
import yaml
from flask import Flask, jsonify, request, render_template
from werkzeug.utils import secure_filename
from utilities.render_worker import RenderWorkerFactory, RenderStatus, string_to_status
app = Flask(__name__)
logger = logging.getLogger()
local_hostname = socket.gethostname()
JSON_FILE = 'server_state.json'
#todo: move history to sqlite db
class RenderJob:
def __init__(self, render, priority=2, owner=None, client=None, notify=None, custom_id=None):
self.id = custom_id or self.generate_id()
self.owner = owner
self.render = render
self.priority = priority
self.client = client
self.notify = notify
self.date_created = datetime.now()
self.scheduled_start = None
self.renderer = render.renderer
self.name = os.path.basename(render.input) + '_' + self.date_created.isoformat()
self.archived = False
def render_status(self):
"""Returns status of render job"""
try:
if self.scheduled_start and self.render.status == RenderStatus.NOT_STARTED:
return RenderStatus.SCHEDULED
else:
return self.render.status
except Exception as e:
logger.warning("render_status error: {}".format(e))
return RenderStatus.ERROR
def json(self):
"""Converts RenderJob into JSON format"""
import numbers
def date_serializer(o):
if isinstance(o, datetime):
return o.isoformat()
json_string = ''
try:
d = self.__dict__.copy()
d['status'] = self.render_status().value
d['render'] = self.render.__dict__.copy()
for key in ['thread', 'process']: # remove unwanted keys from JSON
d['render'].pop(key, None)
d['render']['status'] = d['status']
# jobs from current_session generate percent completed
# jobs after loading server pull in a saved value. Have to check if callable object or not
percent_complete = self.render.percent_complete if isinstance(self.render.percent_complete, numbers.Number) \
else self.render.percent_complete()
d['render']['percent_complete'] = percent_complete
json_string = json.dumps(d, default=date_serializer)
except Exception as e:
logger.error("Error converting to JSON: {}".format(e))
return json_string
@classmethod
def generate_id(cls):
return str(uuid.uuid4()).split('-')[0]
class RenderServer:
render_queue = []
render_clients = []
maximum_renderer_instances = {'Blender': 2, 'After Effects': 1, 'ffmpeg': 4}
host_name = None
port = 8080
client_mode = False
server_hostname = None
last_saved_counts = {}
def __init__(self):
pass
@classmethod
def add_to_render_queue(cls, render_job, force_start=False, client=None):
if not client or render_job.client == cls.host_name:
logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job.render))
render_job.client = cls.host_name
cls.render_queue.append(render_job)
if force_start:
cls.start_job(render_job)
else:
cls.evaluate_queue()
else:
# todo: implement client rendering
logger.warning('remote client rendering not implemented yet')
@classmethod
def running_jobs(cls):
return cls.jobs_with_status(RenderStatus.RUNNING)
@classmethod
def pending_jobs(cls):
pending_jobs = cls.jobs_with_status(RenderStatus.NOT_STARTED)
pending_jobs.extend(cls.jobs_with_status(RenderStatus.SCHEDULED))
return pending_jobs
@classmethod
def jobs_with_status(cls, status, priority_sorted=False, include_archived=True):
found_jobs = [x for x in cls.render_queue if x.render_status() == status]
if not include_archived:
found_jobs = [x for x in found_jobs if not x.archived]
if priority_sorted:
found_jobs = sorted(found_jobs, key=lambda a: a.priority, reverse=False)
return found_jobs
@classmethod
def job_with_id(cls, job_id):
found_job = next((x for x in cls.render_queue if x.id == job_id), None)
return found_job
@classmethod
def clear_history(cls):
to_remove = [x for x in cls.render_queue if x.render_status() in [RenderStatus.CANCELLED,
RenderStatus.COMPLETED, RenderStatus.ERROR]]
for x in to_remove:
x.archived = True
cls.save_state()
@classmethod
def load_state(cls, json_path=None):
"""Load state history from JSON file"""
input_path = json_path or JSON_FILE
if os.path.exists(input_path):
with open(input_path) as f:
# load saved data
saved_state = json.load(f)
cls.render_clients = saved_state.get('clients', {})
for job in saved_state.get('jobs', []):
# Identify renderer type and recreate Renderer object
job_render_object = RenderWorkerFactory.create_worker(job['renderer'], input_path=job['render']['input'], output_path=job['render']['output'])
# Load Renderer values
for key, val in job['render'].items():
if val and key in ['start_time', 'end_time']: # convert date strings back into date objects
job_render_object.__dict__[key] = datetime.fromisoformat(val)
else:
job_render_object.__dict__[key] = val
job_render_object.status = RenderStatus[job['status'].upper()]
job.pop('render', None)
# Create RenderJob with re-created Renderer object
new_job = RenderJob(job_render_object, job['priority'], job['client'])
for key, val in job.items():
if key in ['date_created']: # convert date strings back to datetime objects
new_job.__dict__[key] = datetime.fromisoformat(val)
else:
new_job.__dict__[key] = val
new_job.__delattr__('status')
# Handle older loaded jobs that were cancelled before closing
if new_job.render_status() == RenderStatus.RUNNING:
new_job.render.status = RenderStatus.CANCELLED
# finally add back to render queue
cls.render_queue.append(new_job)
cls.last_saved_counts = cls.job_counts()
@classmethod
def save_state(cls, json_path=None):
"""Save state history to JSON file"""
try:
logger.debug("Saving Render History")
output = {'timestamp': datetime.now().isoformat(),
'jobs': [json.loads(j.json()) for j in cls.render_queue],
'clients': cls.render_clients}
output_path = json_path or JSON_FILE
with open(output_path, 'w') as f:
json.dump(output, f, indent=4)
cls.last_saved_counts = cls.job_counts()
except Exception as e:
logger.error("Error saving state JSON: {}".format(e))
@classmethod
def evaluate_queue(cls):
instances = cls.renderer_instances()
not_started = cls.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True)
if not_started:
for job in not_started:
renderer = job.render.renderer
higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < job.priority]
max_renderers = renderer in instances.keys() and instances[
renderer] >= cls.maximum_renderer_instances.get(renderer, 1)
if not max_renderers and not higher_priority_jobs:
cls.start_job(job)
scheduled = cls.jobs_with_status(RenderStatus.SCHEDULED, priority_sorted=True)
for job in scheduled:
if job.scheduled_start <= datetime.now():
cls.start_job(job)
if cls.last_saved_counts != cls.job_counts():
cls.save_state()
@classmethod
def start_job(cls, job):
logger.info('Starting {}render: {} - Priority {}'.format('scheduled ' if job.scheduled_start else '', job.name,
job.priority))
job.render.start()
@classmethod
def cancel_job(cls, job):
logger.info('Cancelling job ID: {}'.format(job.id))
if job.render_status() in [RenderStatus.NOT_STARTED, RenderStatus.RUNNING, RenderStatus.ERROR]:
job.render.stop()
job.render.status = RenderStatus.CANCELLED
return True
return False
@classmethod
def renderer_instances(cls):
from collections import Counter
all_instances = [x.render.renderer for x in cls.running_jobs()]
return Counter(all_instances)
@classmethod
def job_counts(cls):
job_counts = {}
for job_status in RenderStatus:
job_counts[job_status.value] = len(RenderServer.jobs_with_status(job_status))
return job_counts
@classmethod
def status(cls):
stats = {"timestamp": datetime.now().isoformat(),
"platform": platform.platform(),
"cpu_percent": psutil.cpu_percent(percpu=False),
"cpu_percent_per_cpu": psutil.cpu_percent(percpu=True),
"cpu_count": psutil.cpu_count(),
"memory_total": psutil.virtual_memory().total,
"memory_available": psutil.virtual_memory().available,
"memory_percent": psutil.virtual_memory().percent,
"job_counts": RenderServer.job_counts(),
"host_name": RenderServer.host_name
}
return stats
@classmethod
def all_jobs(cls):
all_jobs = [x for x in RenderServer.render_queue if not x.archived]
return all_jobs
@classmethod
def register_client(cls, hostname):
success = False
if hostname in cls.render_clients:
logger.warning(f"Client '{hostname}' already registered")
return success
try:
response = requests.get(f"http://{hostname}:8080/status", timeout=3)
if response.ok:
cls.render_clients.append(hostname)
logger.info(f"Client '{hostname}' successfully registered")
success = True
cls.save_state()
except requests.ConnectionError as e:
logger.error(f"Cannot connect to client at hostname: {hostname}")
return success
@classmethod
def unregister_client(cls, hostname):
success = False
if hostname in cls.render_clients and hostname != cls.host_name:
cls.render_clients.remove(hostname)
logger.info(f"Client '{hostname}' successfully unregistered")
success = True
return success
@staticmethod
def is_client_available(client_hostname, timeout=3):
try:
response = requests.get(f"http://{client_hostname}:8080/status", timeout=timeout)
if response.ok:
return True
except requests.ConnectionError as e:
pass
return False
@classmethod
def start(cls, background_thread=False):
def eval_loop(delay_sec=1):
while True:
cls.evaluate_queue()
time.sleep(delay_sec)
logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S',
level=logging.INFO)
with open('config.yaml') as f:
config = yaml.load(f, Loader=yaml.FullLoader)
app.config['UPLOAD_FOLDER'] = config['upload_folder']
app.config['MAX_CONTENT_PATH'] = config['max_content_path']
app.config['RESULT_STATIC_PATH'] = 'static/'
# Get hostname and render clients
cls.host_name = socket.gethostname()
if not RenderServer.render_clients:
cls.render_clients = [RenderServer.host_name]
if not cls.client_mode:
logger.info(f"Starting Zordon in Server Mode - {cls.host_name}")
else:
logger.info(f"Starting Zordon in Client Mode - {cls.host_name}")
# disable most Flask logging
flask_log = logging.getLogger('werkzeug')
flask_log.setLevel(config.get('flask_log_level', 'ERROR').upper())
# Setup the RenderServer object
cls.load_state()
thread = threading.Thread(target=eval_loop, kwargs={'delay_sec': config.get('queue_eval_seconds', 1)}, daemon=True)
thread.start()
if background_thread:
server_thread = threading.Thread(
target=lambda: app.run(host='0.0.0.0', port=cls.port, debug=False, use_reloader=False))
server_thread.start()
server_thread.join()
else:
app.run(host='0.0.0.0', port=cls.port, debug=config.get('flask_debug_enable', False),
use_reloader=False)
@app.get('/jobs')
def jobs_json():
return [json.loads(x.json()) for x in RenderServer.render_queue if not x.archived]
@app.get('/jobs/<status_val>')
def filtered_jobs_json(status_val):
state = string_to_status(status_val)
jobs = [json.loads(x.json()) for x in RenderServer.jobs_with_status(state)]
return jobs
@app.get('/job_status/<job_id>')
def get_job_status(job_id):
found_job = RenderServer.job_with_id(job_id)
if found_job:
return found_job.json()
else:
return None
@app.post('/register_client')
def register_client():
client_hostname = request.values['hostname']
x = RenderServer.register_client(client_hostname)
return "Success" if x else "Fail"
@app.post('/unregister_client')
def unregister_client():
client_hostname = request.values['hostname']
x = RenderServer.unregister_client(client_hostname)
return "Success" if x else "Fail"
@app.get('/full_status')
def full_status():
full_results = {'timestamp': datetime.now().isoformat(), 'servers': {}}
try:
for client_hostname in RenderServer.render_clients:
is_online = False
if client_hostname == local_hostname:
snapshot_results = snapshot()
is_online = True
else:
snapshot_results = {}
try:
snapshot_request = requests.get(f'http://{client_hostname}:8080/snapshot', timeout=1)
snapshot_results = snapshot_request.json()
is_online = snapshot_request.ok
except requests.ConnectionError as e:
pass
server_data = {'status': snapshot_results.get('status', {}), 'jobs': snapshot_results.get('jobs', {}),
'is_online': is_online}
full_results['servers'][client_hostname] = server_data
except Exception as e:
logger.error(f"Exception fetching full status: {e}")
return full_results
@app.get('/snapshot')
def snapshot():
server_status = RenderServer.status()
server_jobs = [json.loads(x.json()) for x in RenderServer.render_queue if not x.archived]
server_data = {'status': server_status, 'jobs': server_jobs, 'timestamp': datetime.now().isoformat()}
return server_data
@app.post('/add_job')
def add_job():
"""Create new job and add to server render queue"""
renderer = request.json.get("renderer", None)
input_path = request.json.get("input", None)
output_path = request.json.get("output", None)
priority = request.json.get('priority', 2)
args = request.json.get('args', None)
client = request.json.get('client', RenderServer.host_name)
force_start = request.json.get('force_start', False)
if None in [renderer, input_path, output_path]:
return {'error': 'missing required parameters'}, 400
if client == RenderServer.host_name:
# Local Renders
try:
render_job = RenderWorkerFactory.create_worker(renderer, input_path, output_path, args)
except ValueError as e:
logger.exception(e)
return {'error': str(e)}, 400
new_job = RenderJob(render_job, priority=priority)
RenderServer.add_to_render_queue(new_job, force_start=force_start)
return new_job.json()
elif client in RenderServer.render_clients:
# see if host is available
if RenderServer.is_client_available(client):
if args and renderer == 'blender' and args.get('pack_files', False):
from utilities.blender_worker import pack_blender_files
new_path = pack_blender_files(path=input_path)
if new_path:
logger.info(f'Packed Blender file successfully: {new_path}')
input_path = new_path
else:
err_msg = f'Failed to pack Blender file: {input_path}'
logger.error(err_msg)
return {'error': err_msg}, 400
# call uploader on remote client
try:
job_files = {'file': open(input_path, 'rb')}
job_data = request.json
job_data['input'] = input_path
response = requests.post(f"http://{client}:8080/uploader", files=job_files, data=job_data)
if response.ok:
logger.info("Job submitted successfully!")
return response.json() if response.json() else "Job ok"
else:
return {'error', 'Job rejected by client'}, 400
except requests.ConnectionError as e:
err_msg = f"Error submitting job to client: {client}"
logger.error(err_msg)
return {'error', err_msg}, 400
else:
# client is not available
err_msg = f"Render client '{client}' is unreachable"
logger.error(err_msg)
return {'error', err_msg}, 400
else:
err_msg = f"Unknown render client: '{client}'"
logger.error(err_msg)
return {'error', err_msg}, 400
@app.get('/cancel_job')
def cancel_job():
job_id = request.args.get('id', None)
confirm = request.args.get('confirm', False)
if not job_id:
return {'error': 'job id not found'}, 400
elif not confirm:
return {'error': 'confirmation required'}, 400
else:
found = [x for x in RenderServer.render_queue if x.id == job_id]
if len(found) > 1:
# logger.error('Multiple jobs found for ID {}'.format(job_id))
return jsonify({'error': 'multiple jobs found for ID {}'.format(job_id)})
elif found:
success = RenderServer.cancel_job(found[0])
return jsonify({'result': success})
return {'error': 'job not found'}, 400
@app.get('/clear_history')
def clear_history():
RenderServer.clear_history()
return {'result': True}
@app.route('/status')
def status():
return RenderServer.status()
@app.route('/')
def default():
return "Server running"
@app.route('/upload')
def upload_file_page():
return render_template('upload.html')
@app.route('/uploader', methods=['POST'])
def upload_file():
if request.method == 'POST':
try:
uploaded_file = request.files['file']
if not uploaded_file.filename:
return {'error': 'no file uploaded'}
# generate directory to use
logger.debug(f"Receiving uploaded file {uploaded_file.filename}")
new_id = RenderJob.generate_id()
job_dir = os.path.join(app.config['UPLOAD_FOLDER'], new_id + "-" + uploaded_file.filename)
if not os.path.exists(job_dir):
os.makedirs(job_dir)
local_path = os.path.join(job_dir, secure_filename(uploaded_file.filename))
uploaded_file.save(local_path)
renderer = request.values['renderer']
# todo: finish output_path - currently placeholder data
output_path = os.path.join(job_dir, uploaded_file.filename + "-output.mp4")
try:
render_job = RenderWorkerFactory.create_worker(renderer, local_path, output_path,
args=request.values.get('args', None))
new_job = RenderJob(render_job, custom_id=new_id)
RenderServer.add_to_render_queue(new_job)
return new_job.json()
except ValueError as e:
logger.exception(e)
return {'error': str(e)}, 400
except Exception as e:
logger.exception(e)
return {'error': 'unknown error'}
if __name__ == '__main__':
RenderServer.start()