from unittest.mock import MagicMock, patch from datetime import datetime import pytest from src.render_queue import RenderQueue, JobNotFoundError from src.utilities.status_utils import RenderStatus def make_mock_job(job_id='test-1', status=RenderStatus.NOT_STARTED, engine_name='blender', priority=2, **kwargs): job = MagicMock() job.id = job_id job.status = status job.engine_name = engine_name job.priority = priority job.name = kwargs.get('name', 'Test Job') job.scheduled_start = kwargs.get('scheduled_start', None) for k, v in kwargs.items(): setattr(job, k, v) return job class TestRenderQueueDefaults: """Default state.""" def test_init_sets_empty_queue(self, render_queue_instance): assert render_queue_instance.job_queue == [] def test_init_sets_max_instances(self, render_queue_instance): assert render_queue_instance.maximum_renderer_instances == {'blender': 1, 'aerender': 1, 'ffmpeg': 4} def test_init_is_not_running(self, render_queue_instance): assert render_queue_instance.is_running is False class TestAllJobs: """Returning all jobs.""" def test_all_jobs_empty_initially(self, render_queue_instance): assert RenderQueue.all_jobs() == [] def test_all_jobs_returns_job_list(self, render_queue_instance): job = make_mock_job() render_queue_instance.job_queue.append(job) assert len(RenderQueue.all_jobs()) == 1 assert RenderQueue.all_jobs()[0] == job class TestJobsWithStatus: """Filtering jobs by status.""" def test_jobs_with_status_returns_matching(self, render_queue_instance): running = make_mock_job('job-1', RenderStatus.RUNNING) pending = make_mock_job('job-2', RenderStatus.NOT_STARTED) render_queue_instance.job_queue.extend([running, pending]) result = RenderQueue.jobs_with_status(RenderStatus.RUNNING) assert len(result) == 1 assert result[0].id == 'job-1' def test_jobs_with_status_empty_when_no_match(self, render_queue_instance): job = make_mock_job('job-1', RenderStatus.COMPLETED) render_queue_instance.job_queue.append(job) result = RenderQueue.jobs_with_status(RenderStatus.RUNNING) assert result == [] def test_jobs_with_status_sorts_by_priority(self, render_queue_instance): high = make_mock_job('job-1', RenderStatus.NOT_STARTED, priority=1) low = make_mock_job('job-2', RenderStatus.NOT_STARTED, priority=5) render_queue_instance.job_queue.extend([low, high]) result = RenderQueue.jobs_with_status(RenderStatus.NOT_STARTED, priority_sorted=True) assert result[0].id == 'job-1' class TestRunningAndPending: """Convenience methods for running/pending.""" def test_running_jobs(self, render_queue_instance): running = make_mock_job('job-1', RenderStatus.RUNNING) pending = make_mock_job('job-2', RenderStatus.NOT_STARTED) render_queue_instance.job_queue.extend([running, pending]) result = RenderQueue.running_jobs() assert len(result) == 1 assert result[0].id == 'job-1' def test_pending_jobs_includes_not_started_and_scheduled(self, render_queue_instance): ns = make_mock_job('job-1', RenderStatus.NOT_STARTED) sched = make_mock_job('job-2', RenderStatus.SCHEDULED) running = make_mock_job('job-3', RenderStatus.RUNNING) render_queue_instance.job_queue.extend([ns, sched, running]) result = RenderQueue.pending_jobs() assert len(result) == 2 class TestJobWithId: """Lookup by job ID.""" def test_finds_job(self, render_queue_instance): job = make_mock_job('abc-123') render_queue_instance.job_queue.append(job) found = RenderQueue.job_with_id('abc-123') assert found == job def test_raises_when_not_found(self, render_queue_instance): with pytest.raises(JobNotFoundError, match='abc-123'): RenderQueue.job_with_id('abc-123') def test_returns_none_when_not_found_with_none_ok(self, render_queue_instance): result = RenderQueue.job_with_id('abc-123', none_ok=True) assert result is None class TestJobCounts: """Counting jobs by status.""" def test_job_counts_returns_all_statuses(self, render_queue_instance): render_queue_instance.job_queue.extend([ make_mock_job('j1', RenderStatus.RUNNING), make_mock_job('j2', RenderStatus.COMPLETED), make_mock_job('j3', RenderStatus.NOT_STARTED), ]) counts = RenderQueue.job_counts() assert counts[RenderStatus.RUNNING.value] == 1 assert counts[RenderStatus.COMPLETED.value] == 1 assert counts[RenderStatus.NOT_STARTED.value] == 1 assert counts[RenderStatus.ERROR.value] == 0 assert counts[RenderStatus.CANCELLED.value] == 0 class TestIsAvailableForJob: """Renderer availability checks.""" def test_available_when_no_running_jobs(self, render_queue_instance): assert RenderQueue.is_available_for_job('blender') is True def test_not_available_when_maxed_out(self, render_queue_instance): render_queue_instance.job_queue.append( make_mock_job('j1', RenderStatus.RUNNING, engine_name='blender') ) assert RenderQueue.is_available_for_job('blender') is False def test_available_for_different_renderer(self, render_queue_instance): render_queue_instance.job_queue.append( make_mock_job('j1', RenderStatus.RUNNING, engine_name='blender') ) assert RenderQueue.is_available_for_job('ffmpeg') is True def test_blocked_by_higher_priority(self, render_queue_instance): render_queue_instance.job_queue.append( make_mock_job('j1', RenderStatus.RUNNING, engine_name='blender', priority=0) ) assert RenderQueue.is_available_for_job('blender', priority=2) is False def test_ffmpeg_allows_multiple_instances(self, render_queue_instance): for i in range(4): render_queue_instance.job_queue.append( make_mock_job(f'j{i}', RenderStatus.RUNNING, engine_name='ffmpeg') ) assert RenderQueue.is_available_for_job('ffmpeg') is False class TestAddToRenderQueue: """Adding jobs.""" def test_add_job_appends_to_queue(self, render_queue_instance): render_queue_instance.session = MagicMock() job = make_mock_job() RenderQueue.add_to_render_queue(job) assert job in render_queue_instance.job_queue def test_add_job_saves_state(self, render_queue_instance): render_queue_instance.session = MagicMock() job = make_mock_job() RenderQueue.add_to_render_queue(job) render_queue_instance.session.add.assert_called_once_with(job) def test_add_job_force_start_calls_start(self, render_queue_instance): render_queue_instance.session = MagicMock() job = make_mock_job(status=RenderStatus.NOT_STARTED) with patch.object(render_queue_instance, '_start_job') as mock_start: RenderQueue.add_to_render_queue(job, force_start=True) mock_start.assert_called_once_with(job) def test_add_job_force_start_skips_completed(self, render_queue_instance): render_queue_instance.session = MagicMock() job = make_mock_job(status=RenderStatus.COMPLETED) with patch.object(render_queue_instance, '_start_job') as mock_start: RenderQueue.add_to_render_queue(job, force_start=True) mock_start.assert_not_called() def test_add_job_evaluates_queue_when_running(self, render_queue_instance): render_queue_instance.session = MagicMock() render_queue_instance.is_running = True job = make_mock_job() with patch.object(render_queue_instance, '_evaluate_queue') as mock_eval: RenderQueue.add_to_render_queue(job) mock_eval.assert_called_once() class TestStartCancelDelete: """Job lifecycle.""" def test_start_job_calls_job_start(self, render_queue_instance): job = make_mock_job() with patch.object(render_queue_instance, '_save_state'): RenderQueue.start_job(job) job.start.assert_called_once() def test_cancel_job_calls_job_stop(self, render_queue_instance): job = make_mock_job() job.stop.return_value = None job.status = RenderStatus.CANCELLED result = RenderQueue.cancel_job(job) assert result is True job.stop.assert_called_once() def test_delete_job_removes_from_queue(self, render_queue_instance): job = make_mock_job() render_queue_instance.job_queue.append(job) render_queue_instance.session = MagicMock() result = RenderQueue.delete_job(job) assert result is True assert job not in render_queue_instance.job_queue render_queue_instance.session.delete.assert_called_once_with(job) class TestStartStop: """Starting and stopping the queue evaluation loop.""" def test_start_sets_running_and_evaluates(self, render_queue_instance): with patch.object(render_queue_instance, '_evaluate_queue') as mock_eval: RenderQueue.start() assert render_queue_instance.is_running is True mock_eval.assert_called_once() def test_stop_clears_running(self, render_queue_instance): render_queue_instance.is_running = True RenderQueue.stop() assert render_queue_instance.is_running is False class TestEvaluateQueue: """Queue evaluation dispatches jobs.""" def test_evaluate_starts_not_started_jobs(self, render_queue_instance): job = make_mock_job() render_queue_instance.job_queue.append(job) with patch.object(render_queue_instance, '_start_job') as mock_start: with patch.object(render_queue_instance, '_save_state'): RenderQueue.evaluate_queue() mock_start.assert_called_once_with(job) def test_evaluate_respects_max_instances(self, render_queue_instance): # One already running, only 1 slot for blender render_queue_instance.job_queue.append( make_mock_job('running-1', RenderStatus.RUNNING, engine_name='blender') ) waiting = make_mock_job('waiting-1', RenderStatus.NOT_STARTED, engine_name='blender') render_queue_instance.job_queue.append(waiting) with patch.object(render_queue_instance, '_start_job') as mock_start: with patch.object(render_queue_instance, '_save_state'): RenderQueue.evaluate_queue() mock_start.assert_not_called() def test_evaluate_starts_scheduled_jobs(self, render_queue_instance): past = datetime(2020, 1, 1) job = make_mock_job(status=RenderStatus.SCHEDULED, scheduled_start=past) render_queue_instance.job_queue.append(job) with patch.object(render_queue_instance, '_start_job') as mock_start: with patch.object(render_queue_instance, '_save_state'): RenderQueue.evaluate_queue() mock_start.assert_called_once_with(job) class TestClearHistory: """Clearing completed/cancelled/error jobs.""" def test_clear_history_removes_finished_jobs(self, render_queue_instance): render_queue_instance.session = MagicMock() jobs = [ make_mock_job('c', RenderStatus.COMPLETED), make_mock_job('e', RenderStatus.ERROR), make_mock_job('x', RenderStatus.CANCELLED), make_mock_job('r', RenderStatus.RUNNING), ] render_queue_instance.job_queue.extend(jobs) RenderQueue.clear_history() assert len(render_queue_instance.job_queue) == 1 assert render_queue_instance.job_queue[0].id == 'r'