Windows path fixes (#129)

* Change uses of os.path to use Pathlib

* Add return types and type hints

* Add more docstrings

* Add missing import to api_server
This commit is contained in:
2026-01-18 00:18:43 -06:00
committed by GitHub
parent ee9f44e4c4
commit 74dce5cc3d
15 changed files with 536 additions and 262 deletions
+13 -16
View File
@@ -4,6 +4,7 @@ import socket
import threading
import time
from click import Path
from plyer import notification
from pubsub import pub
@@ -70,7 +71,7 @@ class DistributedJobManager:
logger.error(f"Error notifying parent {parent_hostname} about update in subjob {render_job.id}: {e}")
@classmethod
def __local_job_status_changed(cls, job_id, old_status, new_status):
def __local_job_status_changed(cls, job_id: str, old_status: str, new_status: str):
"""
Responds to the 'status_change' pubsub message for local jobs.
If it's a child job, it notifies the parent job about the status change.
@@ -129,13 +130,13 @@ class DistributedJobManager:
# --------------------------------------------
@classmethod
def create_render_job(cls, new_job_attributes, loaded_project_local_path):
def create_render_job(cls, new_job_attributes: dict, loaded_project_local_path: Path):
"""Creates render jobs. Pass in dict of job_data and the local path to the project. It creates and returns a new
render job.
Args:
new_job_attributes (dict): Dict of desired attributes for new job (frame count, renderer, output path, etc)
loaded_project_local_path (str): The local path to the loaded project.
loaded_project_local_path (Path): The local path to the loaded project.
Returns:
worker: Created job worker
@@ -143,15 +144,11 @@ class DistributedJobManager:
# get new output path in output_dir
output_path = new_job_attributes.get('output_path')
if not output_path:
loaded_project_filename = os.path.basename(loaded_project_local_path)
output_filename = os.path.splitext(loaded_project_filename)[0]
else:
output_filename = os.path.basename(output_path)
output_filename = loaded_project_local_path.name if output_path else loaded_project_local_path.stem
# Prepare output path
output_dir = os.path.join(os.path.dirname(os.path.dirname(loaded_project_local_path)), 'output')
output_path = os.path.join(output_dir, output_filename)
output_dir = loaded_project_local_path.parent.parent / "output"
output_path = output_dir / output_filename
os.makedirs(output_dir, exist_ok=True)
logger.debug(f"New job output path: {output_path}")
@@ -186,7 +183,7 @@ class DistributedJobManager:
# --------------------------------------------
@classmethod
def handle_subjob_update_notification(cls, local_job, subjob_data):
def handle_subjob_update_notification(cls, local_job, subjob_data: dict):
"""Responds to a notification from a remote subjob and the host requests any subsequent updates from the subjob.
Args:
@@ -347,7 +344,7 @@ class DistributedJobManager:
RenderServerProxy(parent_worker.hostname).cancel_job(parent_worker.id, confirm=True)
@staticmethod
def __create_subjob(new_job_attributes, project_path, server_data, server_hostname, parent_worker):
def __create_subjob(new_job_attributes: dict, project_path, server_data, server_hostname: str, parent_worker):
"""Convenience method to create subjobs for a parent worker"""
subjob = new_job_attributes.copy()
subjob['name'] = f"{parent_worker.name}[{server_data['frame_range'][0]}-{server_data['frame_range'][-1]}]"
@@ -366,7 +363,7 @@ class DistributedJobManager:
# --------------------------------------------
@staticmethod
def find_available_servers(engine_name, system_os=None):
def find_available_servers(engine_name: str, system_os=None):
"""
Scan the Zeroconf network for currently available render servers supporting a specific engine.
@@ -375,16 +372,16 @@ class DistributedJobManager:
:return: A list of dictionaries with each dict containing hostname and cpu_count of available servers
"""
from api.api_server import API_VERSION
available_servers = []
found_available_servers = []
for hostname in ZeroconfServer.found_hostnames():
host_properties = ZeroconfServer.get_hostname_properties(hostname)
if host_properties.get('api_version') == API_VERSION:
if not system_os or (system_os and system_os == host_properties.get('system_os')):
response = RenderServerProxy(hostname).is_engine_available(engine_name)
if response and response.get('available', False):
available_servers.append(response)
found_available_servers.append(response)
return available_servers
return found_available_servers
if __name__ == '__main__':