diff --git a/src/api/add_job_helpers.py b/src/api/add_job_helpers.py index 39950b0..37f5870 100644 --- a/src/api/add_job_helpers.py +++ b/src/api/add_job_helpers.py @@ -18,6 +18,24 @@ logger = logging.getLogger() def handle_uploaded_project_files(request, jobs_list, upload_directory): + """ + Handles the uploaded project files. + + This method takes a request with a file, a list of jobs, and an upload directory. It checks if the file was uploaded + directly, if it needs to be downloaded from a URL, or if it's already present on the local file system. It then + moves the file to the appropriate directory and returns the local path to the file and its name. + + Args: + request (Request): The request object containing the file. + jobs_list (list): A list of jobs. The first job in the list is used to get the file's URL and local path. + upload_directory (str): The directory where the file should be uploaded. + + Raises: + ValueError: If no valid project paths are found. + + Returns: + tuple: A tuple containing the local path to the loaded project file and its name. + """ # Initialize default values loaded_project_local_path = None @@ -95,7 +113,21 @@ def download_project_from_url(project_url): def process_zipped_project(zip_path): - # Given a zip path, extract its content, and return the main project file path + """ + Processes a zipped project. + + This method takes a path to a zip file, extracts its contents, and returns the path to the extracted project file. + If the zip file contains more than one project file or none, an error is raised. + + Args: + zip_path (str): The path to the zip file. + + Raises: + ValueError: If there's more than 1 project file or none in the zip file. + + Returns: + str: The path to the main project file. + """ work_path = os.path.dirname(zip_path) try: @@ -125,8 +157,21 @@ def process_zipped_project(zip_path): def create_render_jobs(jobs_list, loaded_project_local_path, job_dir): - results = [] + """ + Creates render jobs. + This method takes a list of job data, a local path to a loaded project, and a job directory. It creates a render + job for each job data in the list and appends the result to a list. The list of results is then returned. + + Args: + jobs_list (list): A list of job data. + loaded_project_local_path (str): The local path to the loaded project. + job_dir (str): The job directory. + + Returns: + list: A list of results from creating the render jobs. + """ + results = [] for job_data in jobs_list: try: # get new output path in output_dir diff --git a/src/api/server_proxy.py b/src/api/server_proxy.py index 76ff675..6cca41f 100644 --- a/src/api/server_proxy.py +++ b/src/api/server_proxy.py @@ -24,7 +24,17 @@ LOOPBACK = '127.0.0.1' class RenderServerProxy: + """ + The ServerProxy class is responsible for interacting with a remote server. + It provides methods to request data from the server and store the status of the server. + Attributes: + system_cpu (str): The CPU type of the system. + system_cpu_count (int): The number of CPUs in the system. + system_os (str): The operating system of the system. + system_os_version (str): The version of the operating system. + """ + def __init__(self, hostname, server_port="8080"): self.hostname = hostname self.port = server_port @@ -43,6 +53,9 @@ class RenderServerProxy: self.system_os = None self.system_os_version = None + def __repr__(self): + return f"" + def connect(self): return self.status() @@ -118,8 +131,7 @@ class RenderServerProxy: self.__jobs_cache_token = status_result['token'] def get_data(self, timeout=5): - all_data = self.request_data('full_status', timeout=timeout) - return all_data + return self.request_data('full_status', timeout=timeout) def cancel_job(self, job_id, confirm=False): return self.request_data(f'job/{job_id}/cancel?confirm={confirm}') @@ -143,12 +155,32 @@ class RenderServerProxy: return self.request_data('all_engines') def notify_parent_of_status_change(self, parent_id, subjob): + """ + Notifies the parent job of a status change in a subjob. + + Args: + parent_id (str): The ID of the parent job. + subjob (Job): The subjob that has changed status. + + Returns: + Response: The response from the server. + """ hostname = LOOPBACK if self.is_localhost else self.hostname return requests.post(f'http://{hostname}:{self.port}/api/job/{parent_id}/notify_parent_of_status_change', json=subjob.json()) def post_job_to_server(self, file_path, job_list, callback=None): + """ + Posts a job to the server. + Args: + file_path (str): The path to the file to upload. + job_list (list): A list of jobs to post. + callback (function, optional): A callback function to call during the upload. Defaults to None. + + Returns: + Response: The response from the server. + """ # bypass uploading file if posting to localhost if self.is_localhost: jobs_with_path = [{**item, "local_path": file_path} for item in job_list] @@ -188,10 +220,30 @@ class RenderServerProxy: # --- Renderer --- # def get_renderer_info(self, timeout=5): + """ + Fetches renderer information from the server. + + Args: + timeout (int, optional): The number of seconds to wait for a response from the server. Defaults to 5. + + Returns: + dict: A dictionary containing the renderer information. + """ all_data = self.request_data(f'renderer_info', timeout=timeout) return all_data def delete_engine(self, engine, version, system_cpu=None): + """ + Sends a request to the server to delete a specific engine. + + Args: + engine (str): The name of the engine to delete. + version (str): The version of the engine to delete. + system_cpu (str, optional): The system CPU type. Defaults to None. + + Returns: + Response: The response from the server. + """ form_data = {'engine': engine, 'version': version, 'system_cpu': system_cpu} hostname = LOOPBACK if self.is_localhost else self.hostname return requests.post(f'http://{hostname}:{self.port}/api/delete_engine', json=form_data) diff --git a/src/distributed_job_manager.py b/src/distributed_job_manager.py index a90dbc1..8f74da4 100644 --- a/src/distributed_job_manager.py +++ b/src/distributed_job_manager.py @@ -61,21 +61,21 @@ class DistributedJobManager: # UI Notifications try: if new_status == RenderStatus.COMPLETED: - logger.debug("show render complete notification") + logger.debug("Show render complete notification") notification.notify( title='Render Job Complete', message=f'{render_job.name} completed succesfully', timeout=10 # Display time in seconds ) elif new_status == RenderStatus.ERROR: - logger.debug("show render complete notification") + logger.debug("Show render error notification") notification.notify( title='Render Job Failed', message=f'{render_job.name} failed rendering', timeout=10 # Display time in seconds ) elif new_status == RenderStatus.RUNNING: - logger.debug("show render complete notification") + logger.debug("Show render started notification") notification.notify( title='Render Job Started', message=f'{render_job.name} started rendering', @@ -89,12 +89,9 @@ class DistributedJobManager: """ Responds to a status change from a remote subjob and triggers the creation or modification of subjobs as needed. - Parameters: - local_job (BaseRenderWorker): The local parent job worker. - subjob_data (dict): subjob data sent from remote server. - - Returns: - None + Args: + local_job (BaseRenderWorker): The local parent job worker. + subjob_data (dict): Subjob data sent from the remote server. """ subjob_status = string_to_status(subjob_data['status']) @@ -221,6 +218,18 @@ class DistributedJobManager: @classmethod def split_into_subjobs(cls, worker, job_data, project_path, system_os=None): + """ + Splits a job into subjobs and distributes them among available servers. + + This method checks the availability of servers, distributes the work among them, and creates subjobs on each server. + If a server is the local host, it adjusts the frame range of the parent job instead of creating a subjob. + + Args: + worker (Worker): The worker that is handling the job. + job_data (dict): The data for the job to be split. + project_path (str): The path to the project associated with the job. + system_os (str, optional): The operating system of the servers. Defaults to None. + """ # Check availability available_servers = cls.find_available_servers(worker.renderer, system_os) @@ -285,17 +294,18 @@ class DistributedJobManager: """ Splits the frame range among available servers proportionally based on their performance (CPU count). - :param start_frame: int, The start frame number of the animation to be rendered. - :param end_frame: int, The end frame number of the animation to be rendered. - :param available_servers: list, A list of available server dictionaries. Each server dictionary should include - 'hostname' and 'cpu_count' keys (see find_available_servers) - :param method: str, Optional. Specifies the distribution method. Possible values are 'cpu_count' and 'equally' + Args: + start_frame (int): The start frame number of the animation to be rendered. + end_frame (int): The end frame number of the animation to be rendered. + available_servers (list): A list of available server dictionaries. Each server dictionary should include + 'hostname' and 'cpu_count' keys (see find_available_servers). + method (str, optional): Specifies the distribution method. Possible values are 'cpu_count' and 'equally'. + Defaults to 'cpu_count'. - - :return: A list of server dictionaries where each dictionary includes the frame range and total number of frames - to be rendered by the server. + Returns: + list: A list of server dictionaries where each dictionary includes the frame range and total number of frames + to be rendered by the server. """ - # Calculate respective frames for each server def divide_frames_by_cpu_count(frame_start, frame_end, servers): total_frames = frame_end - frame_start + 1