Create subjobs after submission - #54 (#79)

* Force start in render queue only starts NOT_STARTED and SCHEDULED jobs

* Refactor adding jobs / subjobs

* Remove dead code

* Fixed issue with bulk job submission

* Cancel job now cancels all subjobs

* Misc fixes

* JSON now returns job hostname

* Add hostname as optional column in DB

* Misc fixes

* Error handling for removing zip file after download

* Clean up imports

* Fixed issue where worker child information would not be saved
This commit is contained in:
2024-07-30 19:22:38 -05:00
committed by GitHub
parent 6d33f262b3
commit 8a3e74660c
8 changed files with 138 additions and 142 deletions

View File

@@ -10,10 +10,6 @@ import requests
from tqdm import tqdm from tqdm import tqdm
from werkzeug.utils import secure_filename from werkzeug.utils import secure_filename
from src.distributed_job_manager import DistributedJobManager
from src.engines.engine_manager import EngineManager
from src.render_queue import RenderQueue
logger = logging.getLogger() logger = logging.getLogger()
@@ -153,70 +149,3 @@ def process_zipped_project(zip_path):
logger.error(f"Error processing zip file: {e}") logger.error(f"Error processing zip file: {e}")
raise ValueError(f"Error processing zip file: {e}") raise ValueError(f"Error processing zip file: {e}")
return extracted_project_path return extracted_project_path
def create_render_jobs(jobs_list, loaded_project_local_path):
"""
Creates render jobs.
This method takes a list of job data, a local path to a loaded project, and a job directory. It creates a render
job for each job data in the list and appends the result to a list. The list of results is then returned.
Args:
jobs_list (list): A list of job data.
loaded_project_local_path (str): The local path to the loaded project.
Returns:
list: A list of results from creating the render jobs.
"""
results = []
for job_data in jobs_list:
try:
# get new output path in output_dir
output_path = job_data.get('output_path')
if not output_path:
loaded_project_filename = os.path.basename(loaded_project_local_path)
output_filename = os.path.splitext(loaded_project_filename)[0]
else:
output_filename = os.path.basename(output_path)
# Prepare output path
output_dir = os.path.join(os.path.dirname(os.path.dirname(loaded_project_local_path)), 'output')
output_path = os.path.join(output_dir, output_filename)
os.makedirs(output_dir, exist_ok=True)
logger.debug(f"New job output path: {output_path}")
# create & configure jobs
worker = EngineManager.create_worker(renderer=job_data['renderer'],
input_path=loaded_project_local_path,
output_path=output_path,
engine_version=job_data.get('engine_version'),
args=job_data.get('args', {}))
worker.status = job_data.get("initial_status", worker.status)
worker.parent = job_data.get("parent", worker.parent)
worker.name = job_data.get("name", worker.name)
worker.priority = int(job_data.get('priority', worker.priority))
worker.start_frame = int(job_data.get("start_frame", worker.start_frame))
worker.end_frame = int(job_data.get("end_frame", worker.end_frame))
# determine if we can / should split the job
if job_data.get("enable_split_jobs", False) and (worker.total_frames > 1) and not worker.parent:
DistributedJobManager.split_into_subjobs(worker, job_data, loaded_project_local_path)
else:
logger.debug("Not splitting into subjobs")
RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False))
if not worker.parent:
from src.api.api_server import make_job_ready
make_job_ready(worker.id)
results.append(worker.json())
except FileNotFoundError as e:
err_msg = f"Cannot create job: {e}"
logger.error(err_msg)
results.append({'error': err_msg})
except Exception as e:
err_msg = f"Exception creating render job: {e}"
logger.exception(err_msg)
results.append({'error': err_msg})
return results

View File

@@ -17,7 +17,7 @@ import psutil
import yaml import yaml
from flask import Flask, request, send_file, after_this_request, Response, redirect, url_for, abort from flask import Flask, request, send_file, after_this_request, Response, redirect, url_for, abort
from src.api.add_job_helpers import handle_uploaded_project_files, process_zipped_project, create_render_jobs from src.api.add_job_helpers import handle_uploaded_project_files, process_zipped_project
from src.api.serverproxy_manager import ServerProxyManager from src.api.serverproxy_manager import ServerProxyManager
from src.distributed_job_manager import DistributedJobManager from src.distributed_job_manager import DistributedJobManager
from src.engines.core.base_worker import string_to_status, RenderStatus from src.engines.core.base_worker import string_to_status, RenderStatus
@@ -182,24 +182,6 @@ def get_file_list(job_id):
return RenderQueue.job_with_id(job_id).file_list() return RenderQueue.job_with_id(job_id).file_list()
@server.get('/api/job/<job_id>/make_ready')
def make_job_ready(job_id):
try:
found_job = RenderQueue.job_with_id(job_id)
if found_job.status in [RenderStatus.CONFIGURING, RenderStatus.NOT_STARTED]:
if found_job.children:
for child_key in found_job.children.keys():
child_id = child_key.split('@')[0]
hostname = child_key.split('@')[-1]
ServerProxyManager.get_proxy_for_hostname(hostname).request_data(f'job/{child_id}/make_ready')
found_job.status = RenderStatus.NOT_STARTED
RenderQueue.save_state()
return found_job.json(), 200
except Exception as e:
return f"Error making job ready: {e}", 500
return "Not valid command", 405
@server.route('/api/job/<job_id>/download_all') @server.route('/api/job/<job_id>/download_all')
def download_all(job_id): def download_all(job_id):
zip_filename = None zip_filename = None
@@ -207,7 +189,10 @@ def download_all(job_id):
@after_this_request @after_this_request
def clear_zip(response): def clear_zip(response):
if zip_filename and os.path.exists(zip_filename): if zip_filename and os.path.exists(zip_filename):
try:
os.remove(zip_filename) os.remove(zip_filename)
except Exception as e:
logger.warning(f"Error removing zip file '{zip_filename}': {e}")
return response return response
found_job = RenderQueue.job_with_id(job_id) found_job = RenderQueue.job_with_id(job_id)
@@ -283,13 +268,13 @@ def add_job_handler():
if loaded_project_local_path.lower().endswith('.zip'): if loaded_project_local_path.lower().endswith('.zip'):
loaded_project_local_path = process_zipped_project(loaded_project_local_path) loaded_project_local_path = process_zipped_project(loaded_project_local_path)
results = create_render_jobs(jobs_list, loaded_project_local_path) results = []
for response in results: for new_job_data in jobs_list:
if response.get('error', None): new_job = DistributedJobManager.create_render_job(new_job_data, loaded_project_local_path)
return results, 400 results.append(new_job.json())
return results, 200 return results, 200
except Exception as e: except Exception as e:
logger.exception(f"Unknown error adding job: {e}") logger.exception(f"Error adding job: {e}")
return 'unknown error', 500 return 'unknown error', 500

View File

@@ -17,7 +17,8 @@ status_colors = {RenderStatus.ERROR: "red", RenderStatus.CANCELLED: 'orange1', R
RenderStatus.RUNNING: 'cyan', RenderStatus.WAITING_FOR_SUBJOBS: 'blue'} RenderStatus.RUNNING: 'cyan', RenderStatus.WAITING_FOR_SUBJOBS: 'blue'}
categories = [RenderStatus.RUNNING, RenderStatus.WAITING_FOR_SUBJOBS, RenderStatus.ERROR, RenderStatus.NOT_STARTED, categories = [RenderStatus.RUNNING, RenderStatus.WAITING_FOR_SUBJOBS, RenderStatus.ERROR, RenderStatus.NOT_STARTED,
RenderStatus.SCHEDULED, RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.UNDEFINED] RenderStatus.SCHEDULED, RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.UNDEFINED,
RenderStatus.CONFIGURING]
logger = logging.getLogger() logger = logging.getLogger()
OFFLINE_MAX = 4 OFFLINE_MAX = 4

View File

@@ -1,15 +1,17 @@
import logging import logging
import os import os
import socket import socket
import threading
import time import time
import zipfile import zipfile
from concurrent.futures import ThreadPoolExecutor
import requests import requests
from plyer import notification from plyer import notification
from pubsub import pub from pubsub import pub
from concurrent.futures import ThreadPoolExecutor
from src.api.server_proxy import RenderServerProxy from src.api.server_proxy import RenderServerProxy
from src.engines.engine_manager import EngineManager
from src.render_queue import RenderQueue from src.render_queue import RenderQueue
from src.utilities.misc_helper import get_file_size_human from src.utilities.misc_helper import get_file_size_human
from src.utilities.status_utils import RenderStatus, string_to_status from src.utilities.status_utils import RenderStatus, string_to_status
@@ -86,6 +88,68 @@ class DistributedJobManager:
except Exception as e: except Exception as e:
logger.debug(f"Unable to show UI notification: {e}") logger.debug(f"Unable to show UI notification: {e}")
# --------------------------------------------
# Create Job
# --------------------------------------------
@classmethod
def create_render_job(cls, job_data, loaded_project_local_path):
"""
Creates render jobs.
This method takes a list of job data, a local path to a loaded project, and a job directory. It creates a render
job for each job data in the list and appends the result to a list. The list of results is then returned.
Args:
job_data (dict): Job data.
loaded_project_local_path (str): The local path to the loaded project.
Returns:
worker: Created job worker
"""
# get new output path in output_dir
output_path = job_data.get('output_path')
if not output_path:
loaded_project_filename = os.path.basename(loaded_project_local_path)
output_filename = os.path.splitext(loaded_project_filename)[0]
else:
output_filename = os.path.basename(output_path)
# Prepare output path
output_dir = os.path.join(os.path.dirname(os.path.dirname(loaded_project_local_path)), 'output')
output_path = os.path.join(output_dir, output_filename)
os.makedirs(output_dir, exist_ok=True)
logger.debug(f"New job output path: {output_path}")
# create & configure jobs
worker = EngineManager.create_worker(renderer=job_data['renderer'],
input_path=loaded_project_local_path,
output_path=output_path,
engine_version=job_data.get('engine_version'),
args=job_data.get('args', {}))
worker.status = job_data.get("initial_status", worker.status) # todo: is this necessary?
worker.parent = job_data.get("parent", worker.parent)
worker.name = job_data.get("name", worker.name)
worker.priority = int(job_data.get('priority', worker.priority))
worker.start_frame = int(job_data.get("start_frame", worker.start_frame))
worker.end_frame = int(job_data.get("end_frame", worker.end_frame))
worker.hostname = socket.gethostname()
# determine if we can / should split the job
if job_data.get("enable_split_jobs", False) and (worker.total_frames > 1) and not worker.parent:
cls.split_into_subjobs_async(worker, job_data, loaded_project_local_path)
else:
logger.debug("Not splitting into subjobs")
RenderQueue.add_to_render_queue(worker, force_start=job_data.get('force_start', False))
return worker
# --------------------------------------------
# Handling Subjobs
# --------------------------------------------
@classmethod @classmethod
def handle_subjob_status_change(cls, local_job, subjob_data): def handle_subjob_status_change(cls, local_job, subjob_data):
""" """
@@ -142,7 +206,7 @@ class DistributedJobManager:
RenderServerProxy(subjob_hostname).get_job_files(subjob_id, zip_file_path) RenderServerProxy(subjob_hostname).get_job_files(subjob_id, zip_file_path)
logger.info(f"File transfer complete for {logname} - Transferred {get_file_size_human(zip_file_path)}") logger.info(f"File transfer complete for {logname} - Transferred {get_file_size_human(zip_file_path)}")
except Exception as e: except Exception as e:
logger.exception(f"Exception downloading files from remote server: {e}") logger.error(f"Error downloading files from remote server: {e}")
local_job.children[child_key]['download_status'] = 'failed' local_job.children[child_key]['download_status'] = 'failed'
return False return False
@@ -218,8 +282,20 @@ class DistributedJobManager:
f"{', '.join(list(subjobs_not_downloaded().keys()))}") f"{', '.join(list(subjobs_not_downloaded().keys()))}")
time.sleep(5) time.sleep(5)
# --------------------------------------------
# Creating Subjobs
# --------------------------------------------
@classmethod @classmethod
def split_into_subjobs(cls, worker, job_data, project_path, system_os=None): def split_into_subjobs_async(cls, parent_worker, job_data, project_path, system_os=None):
# todo: I don't love this
parent_worker.status = RenderStatus.CONFIGURING
cls.background_worker = threading.Thread(target=cls.split_into_subjobs, args=(parent_worker, job_data,
project_path, system_os))
cls.background_worker.start()
@classmethod
def split_into_subjobs(cls, parent_worker, job_data, project_path, system_os=None, specific_servers=None):
""" """
Splits a job into subjobs and distributes them among available servers. Splits a job into subjobs and distributes them among available servers.
@@ -228,56 +304,50 @@ class DistributedJobManager:
subjob. subjob.
Args: Args:
worker (Worker): The worker that is handling the job. parent_worker (Worker): The worker that is handling the job.
job_data (dict): The data for the job to be split. job_data (dict): The data for the job to be split.
project_path (str): The path to the project associated with the job. project_path (str): The path to the project associated with the job.
system_os (str, optional): The operating system of the servers. Defaults to None. system_os (str, optional): The operating system of the servers. Default is any OS.
specific_servers (list, optional): List of specific servers to split work between. Defaults to all found.
""" """
# Check availability # Check availability
available_servers = cls.find_available_servers(worker.renderer, system_os) parent_worker.status = RenderStatus.CONFIGURING
available_servers = specific_servers if specific_servers else cls.find_available_servers(parent_worker.renderer, system_os)
logger.debug(f"Splitting into subjobs - Available servers: {available_servers}") logger.debug(f"Splitting into subjobs - Available servers: {available_servers}")
subjob_servers = cls.distribute_server_work(worker.start_frame, worker.end_frame, available_servers) subjob_servers = cls.distribute_server_work(parent_worker.start_frame, parent_worker.end_frame, available_servers)
local_hostname = socket.gethostname()
# Prep and submit these sub-jobs # Prep and submit these sub-jobs
logger.info(f"Job {worker.id} split plan: {subjob_servers}") logger.info(f"Job {parent_worker.id} split plan: {subjob_servers}")
try: try:
for server_data in subjob_servers: for subjob_data in subjob_servers:
server_hostname = server_data['hostname'] subjob_hostname = subjob_data['hostname']
if server_hostname != local_hostname: if subjob_hostname != parent_worker.hostname:
post_results = cls.__create_subjob(job_data, local_hostname, project_path, server_data, post_results = cls.__create_subjob(job_data, parent_worker.hostname, project_path, subjob_data,
server_hostname, worker) subjob_hostname, parent_worker)
if post_results.ok: if not post_results.ok:
server_data['submission_results'] = post_results.json()[0] ValueError(f"Failed to create subjob on {subjob_hostname}")
else:
logger.error(f"Failed to create subjob on {server_hostname}") # save child info
break submission_results = post_results.json()[0]
child_key = f"{submission_results['id']}@{subjob_hostname}"
parent_worker.children[child_key] = submission_results
else: else:
# truncate parent render_job # truncate parent render_job
worker.start_frame = max(server_data['frame_range'][0], worker.start_frame) parent_worker.start_frame = max(subjob_data['frame_range'][0], parent_worker.start_frame)
worker.end_frame = min(server_data['frame_range'][-1], worker.end_frame) parent_worker.end_frame = min(subjob_data['frame_range'][-1], parent_worker.end_frame)
logger.info(f"Local job now rendering from {worker.start_frame} to {worker.end_frame}") logger.info(f"Local job now rendering from {parent_worker.start_frame} to {parent_worker.end_frame}")
server_data['submission_results'] = worker.json()
# check that job posts were all successful.
if not all(d.get('submission_results') is not None for d in subjob_servers):
raise ValueError("Failed to create all subjobs") # look into recalculating job #s and use exising jobs
# start subjobs # start subjobs
logger.debug(f"Starting {len(subjob_servers) - 1} attempted subjobs") logger.debug(f"Created {len(subjob_servers) - 1} subjobs successfully")
for server_data in subjob_servers: parent_worker.name = f"{parent_worker.name}[{parent_worker.start_frame}-{parent_worker.end_frame}]"
if server_data['hostname'] != local_hostname: parent_worker.status = RenderStatus.NOT_STARTED # todo: this won't work with scheduled starts
child_key = f"{server_data['submission_results']['id']}@{server_data['hostname']}"
worker.children[child_key] = server_data['submission_results']
worker.name = f"{worker.name}[{worker.start_frame}-{worker.end_frame}]"
except Exception as e: except Exception as e:
# cancel all the subjobs # cancel all the subjobs
logger.error(f"Failed to split job into subjobs: {e}") logger.error(f"Failed to split job into subjobs: {e}")
logger.debug(f"Cancelling {len(subjob_servers) - 1} attempted subjobs") logger.debug(f"Cancelling {len(subjob_servers) - 1} attempted subjobs")
# [RenderServerProxy(hostname).cancel_job(results['id'], confirm=True) for hostname, results in RenderServerProxy(parent_worker.hostname).cancel_job(parent_worker.id, confirm=True)
# submission_results.items()] # todo: fix this
@staticmethod @staticmethod
def __create_subjob(job_data, local_hostname, project_path, server_data, server_hostname, worker): def __create_subjob(job_data, local_hostname, project_path, server_data, server_hostname, worker):
@@ -292,6 +362,10 @@ class DistributedJobManager:
file_path=project_path, job_list=[subjob]) file_path=project_path, job_list=[subjob])
return post_results return post_results
# --------------------------------------------
# Server Handling
# --------------------------------------------
@staticmethod @staticmethod
def distribute_server_work(start_frame, end_frame, available_servers, method='cpu_benchmark'): def distribute_server_work(start_frame, end_frame, available_servers, method='cpu_benchmark'):
""" """
@@ -440,7 +514,7 @@ if __name__ == '__main__':
print("Starting Zeroconf...") print("Starting Zeroconf...")
time.sleep(2) time.sleep(2)
available_servers = DistributedJobManager.find_available_servers('blender') available_servers = DistributedJobManager.find_available_servers('blender')
print(f"AVAILABLE SERVERS: {available_servers}") print(f"AVAILABLE SERVERS ({len(available_servers)}): {available_servers}")
results = DistributedJobManager.distribute_server_work(1, 100, available_servers) results = DistributedJobManager.distribute_server_work(1, 100, available_servers)
print(f"RESULTS: {results}") print(f"RESULTS: {results}")
ZeroconfServer.stop() ZeroconfServer.stop()

View File

@@ -11,6 +11,7 @@ import psutil
from pubsub import pub from pubsub import pub
from sqlalchemy import Column, Integer, String, DateTime, JSON from sqlalchemy import Column, Integer, String, DateTime, JSON
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.mutable import MutableDict
from src.utilities.misc_helper import get_time_elapsed from src.utilities.misc_helper import get_time_elapsed
from src.utilities.status_utils import RenderStatus, string_to_status from src.utilities.status_utils import RenderStatus, string_to_status
@@ -23,6 +24,7 @@ class BaseRenderWorker(Base):
__tablename__ = 'render_workers' __tablename__ = 'render_workers'
id = Column(String, primary_key=True) id = Column(String, primary_key=True)
hostname = Column(String, nullable=True)
input_path = Column(String) input_path = Column(String)
output_path = Column(String) output_path = Column(String)
date_created = Column(DateTime) date_created = Column(DateTime)
@@ -36,7 +38,7 @@ class BaseRenderWorker(Base):
start_frame = Column(Integer) start_frame = Column(Integer)
end_frame = Column(Integer, nullable=True) end_frame = Column(Integer, nullable=True)
parent = Column(String, nullable=True) parent = Column(String, nullable=True)
children = Column(JSON) children = Column(MutableDict.as_mutable(JSON))
name = Column(String) name = Column(String)
file_hash = Column(String) file_hash = Column(String)
_status = Column(String) _status = Column(String)
@@ -60,6 +62,7 @@ class BaseRenderWorker(Base):
# Essential Info # Essential Info
self.id = generate_id() self.id = generate_id()
self.hostname = None
self.input_path = input_path self.input_path = input_path
self.output_path = output_path self.output_path = output_path
self.args = args or {} self.args = args or {}
@@ -85,7 +88,7 @@ class BaseRenderWorker(Base):
self.end_time = None self.end_time = None
# History # History
self.status = RenderStatus.CONFIGURING self.status = RenderStatus.NOT_STARTED
self.warnings = [] self.warnings = []
self.errors = [] self.errors = []
@@ -306,6 +309,7 @@ class BaseRenderWorker(Base):
job_dict = { job_dict = {
'id': self.id, 'id': self.id,
'name': self.name, 'name': self.name,
'hostname': self.hostname,
'input_path': self.input_path, 'input_path': self.input_path,
'output_path': self.output_path, 'output_path': self.output_path,
'priority': self.priority, 'priority': self.priority,

View File

@@ -208,7 +208,7 @@ class EngineManager:
worker_class = cls.engine_with_name(renderer).worker_class() worker_class = cls.engine_with_name(renderer).worker_class()
# check to make sure we have versions installed # check to make sure we have versions installed
all_versions = EngineManager.all_versions_for_engine(renderer) all_versions = cls.all_versions_for_engine(renderer)
if not all_versions: if not all_versions:
raise FileNotFoundError(f"Cannot find any installed {renderer} engines") raise FileNotFoundError(f"Cannot find any installed {renderer} engines")
@@ -222,7 +222,7 @@ class EngineManager:
# Download the required engine if not found locally # Download the required engine if not found locally
if not engine_path: if not engine_path:
download_result = EngineManager.download_engine(renderer, engine_version) download_result = cls.download_engine(renderer, engine_version)
if not download_result: if not download_result:
raise FileNotFoundError(f"Cannot download requested version: {renderer} {engine_version}") raise FileNotFoundError(f"Cannot download requested version: {renderer} {engine_version}")
engine_path = download_result['path'] engine_path = download_result['path']

View File

@@ -8,10 +8,10 @@ from collections import deque
from PyQt6.QtCore import QObject, pyqtSignal from PyQt6.QtCore import QObject, pyqtSignal
from PyQt6.QtWidgets import QApplication from PyQt6.QtWidgets import QApplication
from .render_queue import RenderQueue
from .ui.main_window import MainWindow
from src.api.api_server import start_server from src.api.api_server import start_server
from src.engines.engine_manager import EngineManager
from src.render_queue import RenderQueue
from src.ui.main_window import MainWindow
from src.utilities.config import Config from src.utilities.config import Config
from src.utilities.misc_helper import system_safe_path from src.utilities.misc_helper import system_safe_path
@@ -28,6 +28,9 @@ def run() -> int:
# Load Config YAML # Load Config YAML
Config.setup_config_dir() Config.setup_config_dir()
Config.load_config(system_safe_path(os.path.join(Config.config_dir(), 'config.yaml'))) Config.load_config(system_safe_path(os.path.join(Config.config_dir(), 'config.yaml')))
EngineManager.engines_path = system_safe_path(
os.path.join(os.path.join(os.path.expanduser(Config.upload_folder),
'engines')))
logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S', logging.basicConfig(format='%(asctime)s: %(levelname)s: %(module)s: %(message)s', datefmt='%d-%b-%y %H:%M:%S',
level=Config.server_log_level.upper()) level=Config.server_log_level.upper())

View File

@@ -32,7 +32,7 @@ class RenderQueue:
def add_to_render_queue(cls, render_job, force_start=False): def add_to_render_queue(cls, render_job, force_start=False):
logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job)) logger.debug('Adding priority {} job to render queue: {}'.format(render_job.priority, render_job))
cls.job_queue.append(render_job) cls.job_queue.append(render_job)
if force_start: if force_start and render_job.status in (RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED):
cls.start_job(render_job) cls.start_job(render_job)
cls.session.add(render_job) cls.session.add(render_job)
cls.save_state() cls.save_state()