Assign frame ranges to servers based on their CPU count (#19)

* Expose renderer availability in status api

* Remove redundant is_available_for_job API call

* New server split logic by cpu and moved to server_helper.py

* Remove old dead code

* Add RenderStatus.WAITING to proxy categories
This commit is contained in:
2023-06-16 00:04:02 -05:00
committed by GitHub
parent 0080cdb371
commit 76e413c18d
5 changed files with 115 additions and 81 deletions

View File

@@ -5,6 +5,7 @@ from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from .workers.base_worker import RenderStatus, BaseRenderWorker, Base from .workers.base_worker import RenderStatus, BaseRenderWorker, Base
from .workers.worker_factory import RenderWorkerFactory
logger = logging.getLogger() logger = logging.getLogger()
@@ -81,7 +82,10 @@ class RenderQueue:
cls.session.commit() cls.session.commit()
@classmethod @classmethod
def is_available_for_job(cls, renderer, priority): def is_available_for_job(cls, renderer, priority=2):
if not RenderWorkerFactory.class_for_name(renderer).engine.renderer_path():
return False
instances = cls.renderer_instances() instances = cls.renderer_instances()
higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < priority] higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < priority]
max_renderers = renderer in instances.keys() and instances[renderer] >= cls.maximum_renderer_instances.get(renderer, 1) max_renderers = renderer in instances.keys() and instances[renderer] >= cls.maximum_renderer_instances.get(renderer, 1)

View File

@@ -1,13 +1,9 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import json import json
import logging
import os
import pathlib import pathlib
import platform import platform
import shutil import shutil
import socket
import ssl import ssl
import threading
import time import time
import zipfile import zipfile
from datetime import datetime from datetime import datetime
@@ -15,15 +11,13 @@ from urllib.request import urlretrieve
from zipfile import ZipFile from zipfile import ZipFile
import json2html import json2html
import psutil
import yaml import yaml
from flask import Flask, request, render_template, send_file, after_this_request, Response, redirect, url_for, abort from flask import Flask, request, render_template, send_file, after_this_request, Response, redirect, url_for, abort
from werkzeug.utils import secure_filename from werkzeug.utils import secure_filename
from lib.render_queue import RenderQueue, JobNotFoundError from lib.render_queue import RenderQueue, JobNotFoundError
from lib.server.server_proxy import RenderServerProxy
from lib.server.zeroconf_server import ZeroconfServer from lib.server.zeroconf_server import ZeroconfServer
from lib.utilities.server_helper import generate_thumbnail_for_job from lib.utilities.server_helper import *
from lib.workers.base_worker import string_to_status, RenderStatus from lib.workers.base_worker import string_to_status, RenderStatus
from lib.workers.worker_factory import RenderWorkerFactory from lib.workers.worker_factory import RenderWorkerFactory
@@ -264,23 +258,6 @@ def detected_clients():
return server.config['ZEROCONF_SERVER'].found_clients() return server.config['ZEROCONF_SERVER'].found_clients()
@server.route('/api/is_available_for_job', methods=['POST', 'GET'])
def available_for_job():
"""
Check queue to see if it can take a job with a given renderer and priority
"""
renderer = request.args.get('renderer')
priority = request.args.get('priority')
if not renderer or not priority:
return {"error": "Both 'renderer' and 'priority' parameters are required"}, 400
elif renderer not in RenderWorkerFactory.supported_renderers():
return {"error": f"Unsupported renderer: {renderer}"}, 400
else:
return {"is_available": RenderQueue.is_available_for_job(renderer, priority),
'renderer': renderer, 'priority': priority}, 200
@server.post('/api/add_job') @server.post('/api/add_job')
def add_job_handler(): def add_job_handler():
# initial handling of raw data # initial handling of raw data
@@ -441,82 +418,51 @@ def create_subjobs(worker, job_data, project_path):
# Check availablity # Check availablity
local_hostname = server.config['HOSTNAME'] local_hostname = server.config['HOSTNAME']
found_servers = [x for x in server.config['ZEROCONF_SERVER'].found_clients() if local_hostname not in x] found_servers = [x for x in server.config['ZEROCONF_SERVER'].found_clients() if local_hostname not in x]
available_servers = [local_hostname] + [hostname for hostname in found_servers if
RenderServerProxy(hostname).is_available_for_job(renderer=worker.renderer,
priority=worker.priority)]
if len(available_servers) <= 1: subjob_servers = find_available_servers(found_servers, worker.renderer, worker.start_frame, worker.end_frame)
logger.debug("No available servers to split job with. Skipping subjob creation.")
return
logger.info(f"Found {len(available_servers) - 1} additional available servers | "
f"Breaking up job into {len(available_servers)} jobs")
logger.debug(f"Available servers: {available_servers}")
def divide_frames(start_frame, end_frame, num_servers):
frame_range = end_frame - start_frame + 1
frames_per_server = frame_range // num_servers
leftover_frames = frame_range % num_servers
ranges = []
current_start = start_frame
for i in range(num_servers):
current_end = current_start + frames_per_server - 1
if leftover_frames > 0:
current_end += 1
leftover_frames -= 1
if current_start <= current_end:
ranges.append((current_start, current_end))
current_start = current_end + 1
return ranges
# Calculate respective frames for each server
server_frame_ranges = {}
for idx, frame_range in enumerate(divide_frames(worker.start_frame, worker.end_frame, len(available_servers))):
server_frame_ranges[available_servers[idx]] = frame_range
logger.info(f"Job {worker.id} split plan: {server_frame_ranges}")
# Prep and submit these sub-jobs # Prep and submit these sub-jobs
logger.info(f"Job {worker.id} split plan: {subjob_servers}")
submission_results = {} submission_results = {}
try: try:
for server_hostname, frame_range in server_frame_ranges.items(): for server_data in subjob_servers:
server_hostname = server_data['hostname']
if server_hostname != local_hostname: if server_hostname != local_hostname:
subjob = job_data.copy() subjob = job_data.copy()
subjob['name'] = f"{worker.name}[{frame_range[0]}-{frame_range[-1]}]" subjob['name'] = f"{worker.name}[{server_data['frame_range'][0]}-{server_data['frame_range'][-1]}]"
subjob['parent'] = f"{worker.id}@{local_hostname}" subjob['parent'] = f"{worker.id}@{local_hostname}"
subjob['start_frame'] = frame_range[0] subjob['start_frame'] = server_data['frame_range'][0]
subjob['end_frame'] = frame_range[-1] subjob['end_frame'] = server_data['frame_range'][-1]
logger.debug(f"Posting subjob with frames {subjob['start_frame']}-" logger.debug(f"Posting subjob with frames {subjob['start_frame']}-"
f"{subjob['end_frame']} to {server_hostname}") f"{subjob['end_frame']} to {server_hostname}")
post_results = RenderServerProxy(server_hostname).post_job_to_server( post_results = RenderServerProxy(server_hostname).post_job_to_server(
file_path=project_path, job_list=[subjob]) file_path=project_path, job_list=[subjob])
if post_results.ok: if post_results.ok:
submission_results[server_hostname] = post_results.json()[0] server_data['submission_results'] = post_results.json()[0]
else: else:
logger.error(f"Failed to create subjob on {server_hostname}") logger.error(f"Failed to create subjob on {server_hostname}")
break break
else:
# check that job posts were all successful.
if len(submission_results) != (len(server_frame_ranges) - 1):
raise ValueError("Failed to create all subjobs") # look into recalculating job numbers and use exising jobs
# truncate parent render_job # truncate parent render_job
worker.end_frame = min(server_frame_ranges[local_hostname][-1], worker.end_frame) worker.start_frame = max(server_data['frame_range'][0], worker.start_frame)
worker.end_frame = min(server_data['frame_range'][-1], worker.end_frame)
logger.info(f"Local job now rendering from {worker.start_frame} to {worker.end_frame}") logger.info(f"Local job now rendering from {worker.start_frame} to {worker.end_frame}")
# check that job posts were all successful.
if not all(d.get('submission_results') is not None for d in subjob_servers):
raise ValueError("Failed to create all subjobs") # look into recalculating job numbers and use exising jobs
# start subjobs # start subjobs
logger.debug(f"Starting {len(server_frame_ranges) - 1} attempted subjobs") logger.debug(f"Starting {len(subjob_servers) - 1} attempted subjobs")
for hostname, results in submission_results.items(): for server_data in subjob_servers:
worker.children[hostname] = results['id'] worker.children[server_data['hostname']] = server_data['results']['id']
worker.name = f"{worker.name}[{worker.start_frame}-{worker.end_frame}]" worker.name = f"{worker.name}[{worker.start_frame}-{worker.end_frame}]"
except Exception as e: except Exception as e:
# cancel all the subjobs # cancel all the subjobs
logger.error(f"Failed to split job into subjobs: {e}") logger.error(f"Failed to split job into subjobs: {e}")
logger.debug(f"Cancelling {len(server_frame_ranges) - 1} attempted subjobs") logger.debug(f"Cancelling {len(subjob_servers) - 1} attempted subjobs")
[RenderServerProxy(hostname).cancel_job(results['id'], confirm=True) for hostname, results in submission_results.items()] [RenderServerProxy(hostname).cancel_job(results['id'], confirm=True) for hostname, results in submission_results.items()]
@@ -583,15 +529,24 @@ def clear_history():
@server.route('/api/status') @server.route('/api/status')
def status(): def status():
renderer_data = {}
for render_class in RenderWorkerFactory.supported_classes():
renderer_data[render_class.engine.name()] = \
{'version': render_class.engine.version(),
'is_ready': RenderQueue.is_available_for_job(render_class.engine.name())
}
return {"timestamp": datetime.now().isoformat(), return {"timestamp": datetime.now().isoformat(),
"platform": platform.platform(), "platform": platform.platform(),
"cpu_percent": psutil.cpu_percent(percpu=False), "cpu_percent": psutil.cpu_percent(percpu=False),
"cpu_percent_per_cpu": psutil.cpu_percent(percpu=True), "cpu_percent_per_cpu": psutil.cpu_percent(percpu=True),
"cpu_count": psutil.cpu_count(), "cpu_count": psutil.cpu_count(logical=False),
"memory_total": psutil.virtual_memory().total, "memory_total": psutil.virtual_memory().total,
"memory_available": psutil.virtual_memory().available, "memory_available": psutil.virtual_memory().available,
"memory_percent": psutil.virtual_memory().percent, "memory_percent": psutil.virtual_memory().percent,
"job_counts": RenderQueue.job_counts(), "job_counts": RenderQueue.job_counts(),
"renderers": renderer_data,
"hostname": server.config['HOSTNAME'], "hostname": server.config['HOSTNAME'],
"port": server.config['PORT'] "port": server.config['PORT']
} }

View File

@@ -12,7 +12,7 @@ status_colors = {RenderStatus.ERROR: "red", RenderStatus.CANCELLED: 'orange1', R
RenderStatus.NOT_STARTED: "yellow", RenderStatus.SCHEDULED: 'purple', RenderStatus.NOT_STARTED: "yellow", RenderStatus.SCHEDULED: 'purple',
RenderStatus.RUNNING: 'cyan'} RenderStatus.RUNNING: 'cyan'}
categories = [RenderStatus.RUNNING, RenderStatus.ERROR, RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED, categories = [RenderStatus.RUNNING, RenderStatus.WAITING, RenderStatus.ERROR, RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED,
RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.UNDEFINED] RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.UNDEFINED]
logger = logging.getLogger() logger = logging.getLogger()
@@ -111,10 +111,8 @@ class RenderServerProxy:
def cancel_job(self, job_id, confirm=False): def cancel_job(self, job_id, confirm=False):
return self.request_data(f'job/{job_id}/cancel?confirm={confirm}') return self.request_data(f'job/{job_id}/cancel?confirm={confirm}')
def is_available_for_job(self, renderer, priority=2): def get_status(self):
request_data = self.request_data(f'is_available_for_job?renderer={renderer}&priority={priority}', return self.request_data('status')
timeout=1) or {}
return request_data.get('is_available', False)
def post_job_to_server(self, file_path, job_list, callback=None): def post_job_to_server(self, file_path, job_list, callback=None):

View File

@@ -1,9 +1,13 @@
import logging import logging
import os import os
import socket
import subprocess import subprocess
import threading import threading
from .ffmpeg_helper import generate_thumbnail, save_first_frame import psutil
from lib.server.server_proxy import RenderServerProxy
from lib.utilities.ffmpeg_helper import generate_thumbnail, save_first_frame
logger = logging.getLogger() logger = logging.getLogger()
@@ -45,3 +49,75 @@ def generate_thumbnail_for_job(job, thumb_video_path, thumb_image_path, max_widt
if video_files and not os.path.exists(thumb_video_path): if video_files and not os.path.exists(thumb_video_path):
x = threading.Thread(target=generate_thumb_thread, args=(video_files[0],)) x = threading.Thread(target=generate_thumb_thread, args=(video_files[0],))
x.start() x.start()
def divide_frames_evenly(start_frame, end_frame, num_servers):
frame_range = end_frame - start_frame + 1
frames_per_server = frame_range // num_servers
leftover_frames = frame_range % num_servers
ranges = []
current_start = start_frame
for i in range(num_servers):
current_end = current_start + frames_per_server - 1
if leftover_frames > 0:
current_end += 1
leftover_frames -= 1
if current_start <= current_end:
ranges.append((current_start, current_end))
current_start = current_end + 1
return ranges
def divide_frames_by_cpu_count(frame_start, frame_end, servers):
total_frames = frame_end - frame_start + 1
total_performance = sum(server['cpu_count'] for server in servers)
frame_ranges = {}
current_frame = frame_start
allocated_frames = 0
for i, server in enumerate(servers):
if i == len(servers) - 1: # if it's the last server
# Give all remaining frames to the last server
num_frames = total_frames - allocated_frames
else:
num_frames = round((server['cpu_count'] / total_performance) * total_frames)
allocated_frames += num_frames
frame_end_for_server = current_frame + num_frames - 1
if current_frame <= frame_end_for_server:
frame_ranges[server['hostname']] = (current_frame, frame_end_for_server)
current_frame = frame_end_for_server + 1
return frame_ranges
def find_available_servers(server_list, renderer, start_frame, end_frame):
local_hostname = socket.gethostname()
subjob_servers = [{'hostname': local_hostname, 'cpu_count': psutil.cpu_count(logical=False)}]
for hostname in server_list:
if hostname != local_hostname:
response = RenderServerProxy(hostname).get_status()
if response and response.get('renderers', {}).get(renderer, {}).get('is_ready', False):
subjob_servers.append({'hostname': hostname, 'cpu_count': int(response['cpu_count'])})
if len(subjob_servers) == 1:
logger.debug("No available servers to split job with. Skipping subjob creation.")
return subjob_servers
# Calculate respective frames for each server
breakdown = divide_frames_by_cpu_count(start_frame, end_frame, subjob_servers)
subjob_servers = [server for server in subjob_servers if breakdown.get(server['hostname']) is not None]
for server in subjob_servers:
server['frame_range'] = breakdown[server['hostname']]
server['total_frames'] = breakdown[server['hostname']][-1] - breakdown[server['hostname']][0] + 1
return subjob_servers
if __name__ == "__main__":
found_servers = ['kamino.local', 'deathstar.local']
print(find_available_servers(found_servers, 'blender', 1, 5))

View File

@@ -274,6 +274,7 @@ class BaseRenderWorker(Base):
logger.debug("Starting post-processing work") logger.debug("Starting post-processing work")
self.post_processing() self.post_processing()
self.status = RenderStatus.COMPLETED self.status = RenderStatus.COMPLETED
logger.info(f"Render {self.id}-{self.name} completed successfully after {self.time_elapsed()}")
def post_processing(self): def post_processing(self):
pass pass