mirror of
https://github.com/blw1138/Zordon.git
synced 2025-12-17 16:58:12 +00:00
* Fix Blender image sequence -> video conversion and change video to use ProRes * Wait for child jobs to complete * Download and extract render files from subjobs * Fix issue where zip was not removed * Update client to use new method names in server proxy * Fix minor download issue
150 lines
5.7 KiB
Python
150 lines
5.7 KiB
Python
import logging
|
|
import os
|
|
import json
|
|
import requests
|
|
import time
|
|
import threading
|
|
from lib.workers.base_worker import RenderStatus
|
|
from requests_toolbelt.multipart import MultipartEncoder, MultipartEncoderMonitor
|
|
|
|
status_colors = {RenderStatus.ERROR: "red", RenderStatus.CANCELLED: 'orange1', RenderStatus.COMPLETED: 'green',
|
|
RenderStatus.NOT_STARTED: "yellow", RenderStatus.SCHEDULED: 'purple',
|
|
RenderStatus.RUNNING: 'cyan'}
|
|
|
|
categories = [RenderStatus.RUNNING, RenderStatus.ERROR, RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED,
|
|
RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.UNDEFINED]
|
|
|
|
logger = logging.getLogger()
|
|
OFFLINE_MAX = 2
|
|
|
|
|
|
class RenderServerProxy:
|
|
|
|
def __init__(self, hostname, server_port="8080"):
|
|
self.hostname = hostname
|
|
self.port = server_port
|
|
self.fetched_status_data = None
|
|
self.__jobs_cache_token = None
|
|
self.__jobs_cache = []
|
|
self.__update_in_background = False
|
|
self.__background_thread = None
|
|
self.__offline_flags = 0
|
|
self.update_cadence = 5
|
|
|
|
def connect(self):
|
|
status = self.request_data('status')
|
|
return status
|
|
|
|
def is_online(self):
|
|
if self.__update_in_background:
|
|
return self.__offline_flags < OFFLINE_MAX
|
|
else:
|
|
return self.connect() is not None
|
|
|
|
def status(self):
|
|
if not self.is_online():
|
|
return "Offline"
|
|
running_jobs = [x for x in self.__jobs_cache if x['status'] == 'running'] if self.__jobs_cache else []
|
|
return f"{len(running_jobs)} running" if running_jobs else "Available"
|
|
|
|
def request_data(self, payload, timeout=5):
|
|
try:
|
|
req = self.request(payload, timeout)
|
|
if req.ok and req.status_code == 200:
|
|
self.__offline_flags = 0
|
|
return req.json()
|
|
except json.JSONDecodeError as e:
|
|
logger.debug(f"JSON decode error: {e}")
|
|
except requests.ReadTimeout as e:
|
|
logger.warning(f"Timed out: {e}")
|
|
self.__offline_flags = self.__offline_flags + 1
|
|
except requests.ConnectionError as e:
|
|
logger.warning(f"Connection error: {e}")
|
|
self.__offline_flags = self.__offline_flags + 1
|
|
except Exception as e:
|
|
logger.exception(f"Uncaught exception: {e}")
|
|
return None
|
|
|
|
def request(self, payload, timeout=5):
|
|
return requests.get(f'http://{self.hostname}:{self.port}/api/{payload}', timeout=timeout)
|
|
|
|
def start_background_update(self):
|
|
self.__update_in_background = True
|
|
|
|
def thread_worker():
|
|
while self.__update_in_background:
|
|
self.__update_job_cache()
|
|
time.sleep(self.update_cadence)
|
|
|
|
self.__background_thread = threading.Thread(target=thread_worker)
|
|
self.__background_thread.daemon = True
|
|
self.__background_thread.start()
|
|
|
|
def stop_background_update(self):
|
|
self.__update_in_background = False
|
|
|
|
def get_job_info(self, job_id, timeout=5):
|
|
return self.request_data(f'job/{job_id}', timeout=timeout)
|
|
|
|
def get_all_jobs(self, timeout=5, ignore_token=False):
|
|
if not self.__update_in_background or ignore_token:
|
|
self.__update_job_cache(timeout, ignore_token)
|
|
return self.__jobs_cache.copy() if self.__jobs_cache else None
|
|
|
|
def __update_job_cache(self, timeout=5, ignore_token=False):
|
|
url = f'jobs?token={self.__jobs_cache_token}' if self.__jobs_cache_token and not ignore_token else 'jobs'
|
|
status_result = self.request_data(url, timeout=timeout)
|
|
if status_result is not None:
|
|
sorted_jobs = []
|
|
for status_category in categories:
|
|
found_jobs = [x for x in status_result['jobs'] if x['status'] == status_category.value]
|
|
if found_jobs:
|
|
sorted_jobs.extend(found_jobs)
|
|
self.__jobs_cache = sorted_jobs
|
|
self.__jobs_cache_token = status_result['token']
|
|
|
|
def get_data(self, timeout=5):
|
|
all_data = self.request_data('full_status', timeout=timeout)
|
|
return all_data
|
|
|
|
def cancel_job(self, job_id, confirm=False):
|
|
return self.request_data(f'job/{job_id}/cancel?confirm={confirm}')
|
|
|
|
def is_available_for_job(self, renderer, priority=2):
|
|
request_data = self.request_data(f'is_available_for_job?renderer={renderer}&priority={priority}',
|
|
timeout=1) or {}
|
|
return request_data.get('is_available', False)
|
|
|
|
def post_job_to_server(self, input_path, job_list, callback=None):
|
|
# Prepare the form data
|
|
encoder = MultipartEncoder({
|
|
'file': (os.path.basename(input_path), open(input_path, 'rb'), 'application/octet-stream'),
|
|
'json': (None, json.dumps(job_list), 'application/json'),
|
|
})
|
|
|
|
# Create a monitor that will track the upload progress
|
|
if callback:
|
|
monitor = MultipartEncoderMonitor(encoder, callback(encoder))
|
|
else:
|
|
monitor = MultipartEncoderMonitor(encoder)
|
|
|
|
# Send the request
|
|
headers = {'Content-Type': monitor.content_type}
|
|
response = requests.post(f'http://{self.hostname}:{self.port}/api/add_job', data=monitor, headers=headers)
|
|
|
|
return response
|
|
|
|
def get_job_files(self, job_id, save_path):
|
|
url = f"http://{self.hostname}:{self.port}/api/job/{job_id}/download_all"
|
|
return self.download_file(url, filename=save_path)
|
|
|
|
@staticmethod
|
|
def download_file(url, filename):
|
|
with requests.get(url, stream=True) as r:
|
|
r.raise_for_status()
|
|
with open(filename, 'wb') as f:
|
|
for chunk in r.iter_content(chunk_size=8192):
|
|
f.write(chunk)
|
|
return filename
|
|
|