diff --git a/lib/render_queue.py b/lib/render_queue.py index e72b2b0..99770d8 100755 --- a/lib/render_queue.py +++ b/lib/render_queue.py @@ -5,6 +5,7 @@ from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from .workers.base_worker import RenderStatus, BaseRenderWorker, Base +from .workers.worker_factory import RenderWorkerFactory logger = logging.getLogger() @@ -81,7 +82,10 @@ class RenderQueue: cls.session.commit() @classmethod - def is_available_for_job(cls, renderer, priority): + def is_available_for_job(cls, renderer, priority=2): + if not RenderWorkerFactory.class_for_name(renderer).engine.renderer_path(): + return False + instances = cls.renderer_instances() higher_priority_jobs = [x for x in cls.running_jobs() if x.priority < priority] max_renderers = renderer in instances.keys() and instances[renderer] >= cls.maximum_renderer_instances.get(renderer, 1) diff --git a/lib/server/api_server.py b/lib/server/api_server.py index ebc1d0f..093e6eb 100755 --- a/lib/server/api_server.py +++ b/lib/server/api_server.py @@ -1,13 +1,9 @@ #!/usr/bin/env python3 import json -import logging -import os import pathlib import platform import shutil -import socket import ssl -import threading import time import zipfile from datetime import datetime @@ -15,15 +11,13 @@ from urllib.request import urlretrieve from zipfile import ZipFile import json2html -import psutil import yaml from flask import Flask, request, render_template, send_file, after_this_request, Response, redirect, url_for, abort from werkzeug.utils import secure_filename from lib.render_queue import RenderQueue, JobNotFoundError -from lib.server.server_proxy import RenderServerProxy from lib.server.zeroconf_server import ZeroconfServer -from lib.utilities.server_helper import generate_thumbnail_for_job +from lib.utilities.server_helper import * from lib.workers.base_worker import string_to_status, RenderStatus from lib.workers.worker_factory import RenderWorkerFactory @@ -264,23 +258,6 @@ def detected_clients(): return server.config['ZEROCONF_SERVER'].found_clients() -@server.route('/api/is_available_for_job', methods=['POST', 'GET']) -def available_for_job(): - """ - Check queue to see if it can take a job with a given renderer and priority - """ - renderer = request.args.get('renderer') - priority = request.args.get('priority') - - if not renderer or not priority: - return {"error": "Both 'renderer' and 'priority' parameters are required"}, 400 - elif renderer not in RenderWorkerFactory.supported_renderers(): - return {"error": f"Unsupported renderer: {renderer}"}, 400 - else: - return {"is_available": RenderQueue.is_available_for_job(renderer, priority), - 'renderer': renderer, 'priority': priority}, 200 - - @server.post('/api/add_job') def add_job_handler(): # initial handling of raw data @@ -441,82 +418,51 @@ def create_subjobs(worker, job_data, project_path): # Check availablity local_hostname = server.config['HOSTNAME'] found_servers = [x for x in server.config['ZEROCONF_SERVER'].found_clients() if local_hostname not in x] - available_servers = [local_hostname] + [hostname for hostname in found_servers if - RenderServerProxy(hostname).is_available_for_job(renderer=worker.renderer, - priority=worker.priority)] - if len(available_servers) <= 1: - logger.debug("No available servers to split job with. Skipping subjob creation.") - return - - logger.info(f"Found {len(available_servers) - 1} additional available servers | " - f"Breaking up job into {len(available_servers)} jobs") - logger.debug(f"Available servers: {available_servers}") - - def divide_frames(start_frame, end_frame, num_servers): - frame_range = end_frame - start_frame + 1 - frames_per_server = frame_range // num_servers - leftover_frames = frame_range % num_servers - - ranges = [] - current_start = start_frame - for i in range(num_servers): - current_end = current_start + frames_per_server - 1 - if leftover_frames > 0: - current_end += 1 - leftover_frames -= 1 - if current_start <= current_end: - ranges.append((current_start, current_end)) - current_start = current_end + 1 - - return ranges - - # Calculate respective frames for each server - server_frame_ranges = {} - for idx, frame_range in enumerate(divide_frames(worker.start_frame, worker.end_frame, len(available_servers))): - server_frame_ranges[available_servers[idx]] = frame_range - - logger.info(f"Job {worker.id} split plan: {server_frame_ranges}") + subjob_servers = find_available_servers(found_servers, worker.renderer, worker.start_frame, worker.end_frame) # Prep and submit these sub-jobs + logger.info(f"Job {worker.id} split plan: {subjob_servers}") submission_results = {} try: - for server_hostname, frame_range in server_frame_ranges.items(): + for server_data in subjob_servers: + server_hostname = server_data['hostname'] if server_hostname != local_hostname: subjob = job_data.copy() - subjob['name'] = f"{worker.name}[{frame_range[0]}-{frame_range[-1]}]" + subjob['name'] = f"{worker.name}[{server_data['frame_range'][0]}-{server_data['frame_range'][-1]}]" subjob['parent'] = f"{worker.id}@{local_hostname}" - subjob['start_frame'] = frame_range[0] - subjob['end_frame'] = frame_range[-1] + subjob['start_frame'] = server_data['frame_range'][0] + subjob['end_frame'] = server_data['frame_range'][-1] logger.debug(f"Posting subjob with frames {subjob['start_frame']}-" f"{subjob['end_frame']} to {server_hostname}") post_results = RenderServerProxy(server_hostname).post_job_to_server( file_path=project_path, job_list=[subjob]) if post_results.ok: - submission_results[server_hostname] = post_results.json()[0] + server_data['submission_results'] = post_results.json()[0] else: logger.error(f"Failed to create subjob on {server_hostname}") break + else: + # truncate parent render_job + worker.start_frame = max(server_data['frame_range'][0], worker.start_frame) + worker.end_frame = min(server_data['frame_range'][-1], worker.end_frame) + logger.info(f"Local job now rendering from {worker.start_frame} to {worker.end_frame}") # check that job posts were all successful. - if len(submission_results) != (len(server_frame_ranges) - 1): + if not all(d.get('submission_results') is not None for d in subjob_servers): raise ValueError("Failed to create all subjobs") # look into recalculating job numbers and use exising jobs - # truncate parent render_job - worker.end_frame = min(server_frame_ranges[local_hostname][-1], worker.end_frame) - logger.info(f"Local job now rendering from {worker.start_frame} to {worker.end_frame}") - # start subjobs - logger.debug(f"Starting {len(server_frame_ranges) - 1} attempted subjobs") - for hostname, results in submission_results.items(): - worker.children[hostname] = results['id'] + logger.debug(f"Starting {len(subjob_servers) - 1} attempted subjobs") + for server_data in subjob_servers: + worker.children[server_data['hostname']] = server_data['results']['id'] worker.name = f"{worker.name}[{worker.start_frame}-{worker.end_frame}]" except Exception as e: # cancel all the subjobs logger.error(f"Failed to split job into subjobs: {e}") - logger.debug(f"Cancelling {len(server_frame_ranges) - 1} attempted subjobs") + logger.debug(f"Cancelling {len(subjob_servers) - 1} attempted subjobs") [RenderServerProxy(hostname).cancel_job(results['id'], confirm=True) for hostname, results in submission_results.items()] @@ -583,15 +529,24 @@ def clear_history(): @server.route('/api/status') def status(): + + renderer_data = {} + for render_class in RenderWorkerFactory.supported_classes(): + renderer_data[render_class.engine.name()] = \ + {'version': render_class.engine.version(), + 'is_ready': RenderQueue.is_available_for_job(render_class.engine.name()) + } + return {"timestamp": datetime.now().isoformat(), "platform": platform.platform(), "cpu_percent": psutil.cpu_percent(percpu=False), "cpu_percent_per_cpu": psutil.cpu_percent(percpu=True), - "cpu_count": psutil.cpu_count(), + "cpu_count": psutil.cpu_count(logical=False), "memory_total": psutil.virtual_memory().total, "memory_available": psutil.virtual_memory().available, "memory_percent": psutil.virtual_memory().percent, "job_counts": RenderQueue.job_counts(), + "renderers": renderer_data, "hostname": server.config['HOSTNAME'], "port": server.config['PORT'] } diff --git a/lib/server/server_proxy.py b/lib/server/server_proxy.py index 3707471..ec55019 100644 --- a/lib/server/server_proxy.py +++ b/lib/server/server_proxy.py @@ -12,7 +12,7 @@ status_colors = {RenderStatus.ERROR: "red", RenderStatus.CANCELLED: 'orange1', R RenderStatus.NOT_STARTED: "yellow", RenderStatus.SCHEDULED: 'purple', RenderStatus.RUNNING: 'cyan'} -categories = [RenderStatus.RUNNING, RenderStatus.ERROR, RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED, +categories = [RenderStatus.RUNNING, RenderStatus.WAITING, RenderStatus.ERROR, RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED, RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.UNDEFINED] logger = logging.getLogger() @@ -111,10 +111,8 @@ class RenderServerProxy: def cancel_job(self, job_id, confirm=False): return self.request_data(f'job/{job_id}/cancel?confirm={confirm}') - def is_available_for_job(self, renderer, priority=2): - request_data = self.request_data(f'is_available_for_job?renderer={renderer}&priority={priority}', - timeout=1) or {} - return request_data.get('is_available', False) + def get_status(self): + return self.request_data('status') def post_job_to_server(self, file_path, job_list, callback=None): diff --git a/lib/utilities/server_helper.py b/lib/utilities/server_helper.py index 0246931..ebad005 100644 --- a/lib/utilities/server_helper.py +++ b/lib/utilities/server_helper.py @@ -1,9 +1,13 @@ import logging import os +import socket import subprocess import threading -from .ffmpeg_helper import generate_thumbnail, save_first_frame +import psutil + +from lib.server.server_proxy import RenderServerProxy +from lib.utilities.ffmpeg_helper import generate_thumbnail, save_first_frame logger = logging.getLogger() @@ -45,3 +49,75 @@ def generate_thumbnail_for_job(job, thumb_video_path, thumb_image_path, max_widt if video_files and not os.path.exists(thumb_video_path): x = threading.Thread(target=generate_thumb_thread, args=(video_files[0],)) x.start() + + +def divide_frames_evenly(start_frame, end_frame, num_servers): + frame_range = end_frame - start_frame + 1 + frames_per_server = frame_range // num_servers + leftover_frames = frame_range % num_servers + + ranges = [] + current_start = start_frame + for i in range(num_servers): + current_end = current_start + frames_per_server - 1 + if leftover_frames > 0: + current_end += 1 + leftover_frames -= 1 + if current_start <= current_end: + ranges.append((current_start, current_end)) + current_start = current_end + 1 + + return ranges + + +def divide_frames_by_cpu_count(frame_start, frame_end, servers): + total_frames = frame_end - frame_start + 1 + total_performance = sum(server['cpu_count'] for server in servers) + + frame_ranges = {} + current_frame = frame_start + allocated_frames = 0 + + for i, server in enumerate(servers): + if i == len(servers) - 1: # if it's the last server + # Give all remaining frames to the last server + num_frames = total_frames - allocated_frames + else: + num_frames = round((server['cpu_count'] / total_performance) * total_frames) + allocated_frames += num_frames + + frame_end_for_server = current_frame + num_frames - 1 + + if current_frame <= frame_end_for_server: + frame_ranges[server['hostname']] = (current_frame, frame_end_for_server) + current_frame = frame_end_for_server + 1 + + return frame_ranges + + +def find_available_servers(server_list, renderer, start_frame, end_frame): + local_hostname = socket.gethostname() + subjob_servers = [{'hostname': local_hostname, 'cpu_count': psutil.cpu_count(logical=False)}] + for hostname in server_list: + if hostname != local_hostname: + response = RenderServerProxy(hostname).get_status() + if response and response.get('renderers', {}).get(renderer, {}).get('is_ready', False): + subjob_servers.append({'hostname': hostname, 'cpu_count': int(response['cpu_count'])}) + + if len(subjob_servers) == 1: + logger.debug("No available servers to split job with. Skipping subjob creation.") + return subjob_servers + + # Calculate respective frames for each server + breakdown = divide_frames_by_cpu_count(start_frame, end_frame, subjob_servers) + subjob_servers = [server for server in subjob_servers if breakdown.get(server['hostname']) is not None] + for server in subjob_servers: + server['frame_range'] = breakdown[server['hostname']] + server['total_frames'] = breakdown[server['hostname']][-1] - breakdown[server['hostname']][0] + 1 + + return subjob_servers + + +if __name__ == "__main__": + found_servers = ['kamino.local', 'deathstar.local'] + print(find_available_servers(found_servers, 'blender', 1, 5)) diff --git a/lib/workers/base_worker.py b/lib/workers/base_worker.py index 9cca1b3..5367564 100644 --- a/lib/workers/base_worker.py +++ b/lib/workers/base_worker.py @@ -274,6 +274,7 @@ class BaseRenderWorker(Base): logger.debug("Starting post-processing work") self.post_processing() self.status = RenderStatus.COMPLETED + logger.info(f"Render {self.id}-{self.name} completed successfully after {self.time_elapsed()}") def post_processing(self): pass