mirror of
https://github.com/blw1138/Zordon.git
synced 2026-06-09 13:39:24 -05:00
86 lines
3.0 KiB
Python
86 lines
3.0 KiB
Python
import logging
|
|
import os
|
|
import time
|
|
import unittest
|
|
|
|
from src.api.server_proxy import RenderServerProxy
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
SERVER_HOST = os.environ.get('ZORDON_TEST_HOST', '127.0.0.1')
|
|
SERVER_PORT = os.environ.get('ZORDON_TEST_PORT', '8080')
|
|
|
|
|
|
@unittest.skipIf(os.environ.get('ZORDON_SKIP_INTEGRATION'),
|
|
'set ZORDON_SKIP_INTEGRATION to skip integration tests')
|
|
class SubmissionTestCase(unittest.TestCase):
|
|
"""Integration tests requiring a running Zordon server.
|
|
|
|
Start the server: python server.py
|
|
Run tests: ZORDON_TEST_HOST=127.0.0.1 python -m pytest tests/job_creation_tests.py
|
|
|
|
Override host/port via ZORDON_TEST_HOST and ZORDON_TEST_PORT env vars.
|
|
"""
|
|
|
|
render_server = None
|
|
test_job_id = None
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls.render_server = RenderServerProxy(SERVER_HOST, SERVER_PORT)
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
cls.render_server = None
|
|
|
|
def test_connection_ok(self):
|
|
self.assertTrue(self.render_server.is_online(),
|
|
msg=f'Server not reachable at {SERVER_HOST}:{SERVER_PORT}')
|
|
|
|
def test_submit_job(self):
|
|
sample_file_path = os.path.join(os.path.dirname(__file__), 'resources',
|
|
'batman_sample.blend')
|
|
self.assertTrue(os.path.exists(sample_file_path),
|
|
msg=f'Test file not found: {sample_file_path}')
|
|
|
|
sample_job = {
|
|
'name': 'sample_blender_job',
|
|
'renderer': 'blender',
|
|
'start_frame': 1,
|
|
'end_frame': 5,
|
|
'args': {'engine': 'CYCLES'},
|
|
'enable_split_jobs': False,
|
|
}
|
|
|
|
response = self.render_server.create_job(
|
|
file_path=sample_file_path, job_data=sample_job)
|
|
self.assertIsNotNone(response, msg='No response from server')
|
|
self.assertTrue(response.ok, msg=f'Server returned {response.status_code}')
|
|
|
|
logger.info('Submitted to server ok!')
|
|
job_data = response.json()
|
|
self.__class__.test_job_id = job_data[0]['id']
|
|
|
|
def test_wait_for_job_to_complete(self):
|
|
if not self.__class__.test_job_id:
|
|
self.skipTest('No job was submitted in test_submit_job')
|
|
|
|
file_count = 0
|
|
|
|
while True:
|
|
update_response = self.render_server.get_job(self.__class__.test_job_id)
|
|
if update_response:
|
|
print(f"Status: {update_response['status']}")
|
|
|
|
if update_response['status'] not in [
|
|
'not_started', 'running', 'waiting_for_subjobs', 'configuring'
|
|
]:
|
|
self.assertEqual(update_response['status'], 'completed',
|
|
msg=f"Job ended with status: {update_response['status']}")
|
|
break
|
|
|
|
if update_response['file_count'] > file_count:
|
|
file_count = update_response['file_count']
|
|
print(f"File count is now {file_count}")
|
|
time.sleep(1)
|