Files
Zordon/tests/test_distributed_job_manager.py
brett c38213fb58 Update and modernize create-executables action (#138)
* Modernize create-executables.yml

* Update version numbers

* Fix API version in test
2026-06-06 17:10:53 -05:00

204 lines
7.7 KiB
Python

from unittest.mock import MagicMock, patch
import src.distributed_job_manager as djm_module
from src.distributed_job_manager import DistributedJobManager
from src.utilities.status_utils import RenderStatus
class TestSubscribeToListener:
"""PubSub subscription."""
def test_subscribes_to_status_change(self, distributed_job_manager_instance):
distributed_job_manager_instance._subscribe_to_listener()
# Check via the module-level reference (the one _subscribe_to_listener uses)
djm_module.pub.subscribe.assert_any_call(
distributed_job_manager_instance._local_job_status_changed,
'status_change',
)
djm_module.pub.subscribe.assert_any_call(
distributed_job_manager_instance._local_job_frame_complete,
'frame_complete',
)
class TestCreateRenderJob:
"""Creating a render job."""
@patch('src.distributed_job_manager.os.makedirs')
@patch('src.distributed_job_manager.EngineManager.create_worker')
def test_creates_worker_and_adds_to_queue(
self, mock_create_worker, mock_makedirs, distributed_job_manager_instance,
config_instance, tmp_path,
):
worker = MagicMock()
worker.total_frames = 10
worker.parent = None
worker.id = 'job-1'
mock_create_worker.return_value = worker
project_path = tmp_path / 'test_project.blend'
project_path.write_text('fake')
attrs = {
'engine_name': 'blender',
'args': {'engine': 'CYCLES'},
'name': 'Test Job',
'start_frame': 1,
'end_frame': 10,
'priority': 2,
'enable_split_jobs': False,
}
with patch('src.distributed_job_manager.RenderQueue.add_to_render_queue') as mock_add:
with patch('src.distributed_job_manager.PreviewManager.update_previews_for_job'):
result = DistributedJobManager.create_render_job(attrs, project_path)
assert result == worker
assert worker.status == RenderStatus.NOT_STARTED
assert mock_create_worker.call_args.kwargs['output_path'] == project_path.parent.parent / 'output' / 'test_project'
mock_add.assert_called_once_with(worker, force_start=False)
@patch('src.distributed_job_manager.os.makedirs')
@patch('src.distributed_job_manager.EngineManager.create_worker')
def test_uses_requested_output_path(
self, mock_create_worker, mock_makedirs, distributed_job_manager_instance,
config_instance, tmp_path,
):
worker = MagicMock()
worker.total_frames = 10
worker.parent = None
mock_create_worker.return_value = worker
project_path = tmp_path / 'test_project.blend'
project_path.write_text('fake')
attrs = {
'engine_name': 'blender',
'args': {},
'name': 'Camera Job',
'output_path': 'test_project_Camera-001',
'enable_split_jobs': False,
}
with patch('src.distributed_job_manager.RenderQueue.add_to_render_queue'):
with patch('src.distributed_job_manager.PreviewManager.update_previews_for_job'):
DistributedJobManager.create_render_job(attrs, project_path)
assert mock_create_worker.call_args.kwargs['output_path'] == (
project_path.parent.parent / 'output' / 'test_project_Camera-001'
)
@patch('src.distributed_job_manager.os.makedirs')
@patch('src.distributed_job_manager.EngineManager.create_worker')
def test_uses_output_subdir_when_requested(
self, mock_create_worker, mock_makedirs, distributed_job_manager_instance,
config_instance, tmp_path,
):
worker = MagicMock()
worker.total_frames = 10
worker.parent = None
mock_create_worker.return_value = worker
project_path = tmp_path / 'test_project.blend'
project_path.write_text('fake')
attrs = {
'engine_name': 'blender',
'args': {},
'name': 'Camera Job',
'output_path': 'test_project_Camera-001',
'__use_output_subdir': True,
'enable_split_jobs': False,
}
with patch('src.distributed_job_manager.RenderQueue.add_to_render_queue'):
with patch('src.distributed_job_manager.PreviewManager.update_previews_for_job'):
DistributedJobManager.create_render_job(attrs, project_path)
expected_output_dir = project_path.parent.parent / 'output' / 'test_project_Camera-001'
assert mock_create_worker.call_args.kwargs['output_path'] == expected_output_dir / 'test_project_Camera-001'
mock_makedirs.assert_called_with(expected_output_dir, exist_ok=True)
@patch('src.distributed_job_manager.os.makedirs')
@patch('src.distributed_job_manager.EngineManager.create_worker')
def test_split_jobs_enabled_calls_split_async(
self, mock_create_worker, mock_makedirs, distributed_job_manager_instance,
config_instance, tmp_path,
):
worker = MagicMock()
worker.total_frames = 10
worker.parent = None
mock_create_worker.return_value = worker
project_path = tmp_path / 'test_project.blend'
project_path.write_text('fake')
attrs = {
'engine_name': 'blender',
'args': {},
'name': 'Split Job',
'start_frame': 1,
'end_frame': 10,
'priority': 2,
'enable_split_jobs': True,
}
# The forwarder passes system_os=None by default
with patch.object(distributed_job_manager_instance, '_split_into_subjobs_async') as mock_split:
with patch('src.distributed_job_manager.RenderQueue.add_to_render_queue'):
with patch('src.distributed_job_manager.PreviewManager.update_previews_for_job'):
DistributedJobManager.create_render_job(attrs, project_path)
mock_split.assert_called_once_with(worker, attrs, project_path, None)
class TestHandleSubjobUpdate:
"""Processing subjob update notifications."""
def test_updates_child_info(self, distributed_job_manager_instance):
parent_job = MagicMock()
parent_job.children = {}
subjob_data = {
'id': 'sub-1',
'hostname': 'worker-1',
'status': 'completed',
'percent_complete': 1.0,
'file_count': 5,
}
with patch('src.distributed_job_manager.download_missing_frames_from_subjob',
return_value=True):
DistributedJobManager.handle_subjob_update_notification(parent_job, subjob_data)
assert 'sub-1@worker-1' in parent_job.children
assert parent_job.children['sub-1@worker-1']['download_status'] == 'completed'
class TestFindAvailableServers:
"""Discovering remote servers."""
@patch('src.distributed_job_manager.ZeroconfServer.found_hostnames')
@patch('src.distributed_job_manager.ZeroconfServer.get_hostname_properties')
@patch('src.distributed_job_manager.RenderServerProxy')
def test_finds_matching_server(
self, mock_proxy_class, mock_get_props, mock_found_hostnames,
):
mock_found_hostnames.return_value = ['server-1.local']
from src.api.api_server import API_VERSION
mock_get_props.return_value = {'api_version': API_VERSION, 'system_os': 'macos'}
mock_proxy = MagicMock()
mock_proxy.get_engine_availability.return_value = {
'available': True,
'hostname': 'server-1.local',
}
mock_proxy_class.return_value = mock_proxy
result = DistributedJobManager.find_available_servers('blender')
assert len(result) == 1
assert result[0]['hostname'] == 'server-1.local'
mock_proxy.get_engine_availability.assert_called_once_with('blender')