import json import logging import os import threading import time import requests from requests_toolbelt.multipart import MultipartEncoder, MultipartEncoderMonitor from urllib.parse import urljoin from src.utilities.misc_helper import is_localhost from src.utilities.status_utils import RenderStatus status_colors = {RenderStatus.ERROR: "red", RenderStatus.CANCELLED: 'orange1', RenderStatus.COMPLETED: 'green', RenderStatus.NOT_STARTED: "yellow", RenderStatus.SCHEDULED: 'purple', RenderStatus.RUNNING: 'cyan', RenderStatus.WAITING_FOR_SUBJOBS: 'blue'} categories = [RenderStatus.RUNNING, RenderStatus.WAITING_FOR_SUBJOBS, RenderStatus.ERROR, RenderStatus.NOT_STARTED, RenderStatus.SCHEDULED, RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.UNDEFINED, RenderStatus.CONFIGURING] logger = logging.getLogger() OFFLINE_MAX = 4 class RenderServerProxy: """The ServerProxy class is responsible for interacting with a remote server. It provides convenience methods to request data from the server and store the status of the server. """ def __init__(self, hostname, server_port="8080"): self.hostname = hostname self.port = server_port self.fetched_status_data = None self.__jobs_cache_token = None self.__jobs_cache = [] self.__update_in_background = False self.__background_thread = None self.__offline_flags = 0 self.update_cadence = 5 self.is_localhost = bool(is_localhost(hostname)) # Cache some basic server info self.system_cpu = None self.system_cpu_count = None self.system_os = None self.system_os_version = None self.system_api_version = None # -------------------------------------------- # Basics / Connection: # -------------------------------------------- def __repr__(self): return f"" def check_connection(self): try: return self.request("heartbeat").ok except Exception: pass return False def is_online(self): if self.__update_in_background: return self.__offline_flags < OFFLINE_MAX else: return self.check_connection() def status(self): if not self.is_online(): return "Offline" running_jobs = [x for x in self.__jobs_cache if x['status'] == 'running'] if self.__jobs_cache else [] return f"{len(running_jobs)} running" if running_jobs else "Ready" # -------------------------------------------- # Requests: # -------------------------------------------- def request_data(self, payload, timeout=5): try: req = self.request(payload, timeout) if req.ok: self.__offline_flags = 0 if req.status_code == 200: return req.json() except json.JSONDecodeError as e: logger.debug(f"JSON decode error: {e}") except requests.ReadTimeout as e: logger.warning(f"Timed out: {e}") self.__offline_flags = self.__offline_flags + 1 except requests.ConnectionError as e: logger.warning(f"Connection error: {e}") self.__offline_flags = self.__offline_flags + 1 except Exception as e: logger.exception(f"Uncaught exception: {e}") # If server unexpectedly drops off the network, stop background updates if self.__offline_flags > OFFLINE_MAX: try: self.stop_background_update() except KeyError: pass return None def request(self, payload, timeout=5): from src.api.api_server import API_VERSION return requests.get(f'http://{self.hostname}:{self.port}/api/{payload}', timeout=timeout, headers={"X-API-Version": str(API_VERSION)}) # -------------------------------------------- # Background Updates: # -------------------------------------------- def start_background_update(self): if self.__update_in_background: return self.__update_in_background = True def thread_worker(): logger.debug(f'Starting background updates for {self.hostname}') while self.__update_in_background: self.__update_job_cache() time.sleep(self.update_cadence) logger.debug(f'Stopping background updates for {self.hostname}') self.__background_thread = threading.Thread(target=thread_worker) self.__background_thread.daemon = True self.__background_thread.start() def __update_job_cache(self, timeout=40, ignore_token=False): if self.__offline_flags: # if we're offline, don't bother with the long poll ignore_token = True url = f'jobs_long_poll?token={self.__jobs_cache_token}' if (self.__jobs_cache_token and not ignore_token) else 'jobs' status_result = self.request_data(url, timeout=timeout) if status_result is not None: sorted_jobs = [] for status_category in categories: found_jobs = [x for x in status_result['jobs'] if x['status'] == status_category.value] if found_jobs: sorted_jobs.extend(found_jobs) self.__jobs_cache = sorted_jobs self.__jobs_cache_token = status_result['token'] def stop_background_update(self): self.__update_in_background = False # -------------------------------------------- # Get System Info: # -------------------------------------------- def get_all_jobs(self, timeout=5, ignore_token=False): if not self.__update_in_background or ignore_token: self.__update_job_cache(timeout, ignore_token) return self.__jobs_cache.copy() if self.__jobs_cache else None def get_data(self, timeout=5): return self.request_data('full_status', timeout=timeout) def get_status(self): status = self.request_data('status') if status and not self.system_cpu: self.system_cpu = status['system_cpu'] self.system_cpu_count = status['cpu_count'] self.system_os = status['system_os'] self.system_os_version = status['system_os_version'] self.system_api_version = status['api_version'] return status # -------------------------------------------- # Get Job Info: # -------------------------------------------- def get_job_info(self, job_id, timeout=5): return self.request_data(f'job/{job_id}', timeout=timeout) def get_job_files_list(self, job_id): return self.request_data(f"job/{job_id}/file_list") # -------------------------------------------- # Job Lifecycle: # -------------------------------------------- def post_job_to_server(self, file_path, job_list, callback=None): """ Posts a job to the server. Args: file_path (str): The path to the file to upload. job_list (list): A list of jobs to post. callback (function, optional): A callback function to call during the upload. Defaults to None. Returns: Response: The response from the server. """ try: # Check if file exists if not os.path.exists(file_path): raise FileNotFoundError(f"File not found: {file_path}") # Bypass uploading file if posting to localhost if self.is_localhost: jobs_with_path = [{'local_path': file_path, **item} for item in job_list] job_data = json.dumps(jobs_with_path) url = urljoin(f'http://{self.hostname}:{self.port}', '/api/add_job') headers = {'Content-Type': 'application/json'} return requests.post(url, data=job_data, headers=headers) # Prepare the form data for remote host with open(file_path, 'rb') as file: encoder = MultipartEncoder({ 'file': (os.path.basename(file_path), file, 'application/octet-stream'), 'json': (None, json.dumps(job_list), 'application/json'), }) # Create a monitor that will track the upload progress monitor = MultipartEncoderMonitor(encoder, callback) if callback else MultipartEncoderMonitor(encoder) headers = {'Content-Type': monitor.content_type} url = urljoin(f'http://{self.hostname}:{self.port}', '/api/add_job') # Send the request with proper resource management with requests.post(url, data=monitor, headers=headers) as response: return response except requests.ConnectionError as e: logger.error(f"Connection error: {e}") except Exception as e: logger.error(f"An error occurred: {e}") def cancel_job(self, job_id, confirm=False): return self.request_data(f'job/{job_id}/cancel?confirm={confirm}') def delete_job(self, job_id, confirm=False): return self.request_data(f'job/{job_id}/delete?confirm={confirm}') def send_subjob_update_notification(self, parent_id, subjob): """ Notifies the parent job of an update in a subjob. Args: parent_id (str): The ID of the parent job. subjob (Job): The subjob that has updated. Returns: Response: The response from the server. """ return requests.post(f'http://{self.hostname}:{self.port}/api/job/{parent_id}/send_subjob_update_notification', json=subjob.json()) # -------------------------------------------- # Engines: # -------------------------------------------- def is_engine_available(self, engine_name): return self.request_data(f'{engine_name}/is_available') def get_all_engines(self): # todo: this doesnt work return self.request_data('all_engines') def get_engine_info(self, response_type='standard', timeout=5): """ Fetches engine information from the server. Args: response_type (str, optional): Returns standard or full version of engine info timeout (int, optional): The number of seconds to wait for a response from the server. Defaults to 5. Returns: dict: A dictionary containing the engine information. """ all_data = self.request_data(f"engine_info?response_type={response_type}", timeout=timeout) return all_data def delete_engine(self, engine, version, system_cpu=None): """ Sends a request to the server to delete a specific engine. Args: engine (str): The name of the engine to delete. version (str): The version of the engine to delete. system_cpu (str, optional): The system CPU type. Defaults to None. Returns: Response: The response from the server. """ form_data = {'engine': engine, 'version': version, 'system_cpu': system_cpu} return requests.post(f'http://{self.hostname}:{self.port}/api/delete_engine', json=form_data) # -------------------------------------------- # Download Files: # -------------------------------------------- def download_all_job_files(self, job_id, save_path): url = f"http://{self.hostname}:{self.port}/api/job/{job_id}/download_all" return self.__download_file_from_url(url, output_filepath=save_path) def download_job_file(self, job_id, job_filename, save_path): url = f"http://{self.hostname}:{self.port}/api/job/{job_id}/download?filename={job_filename}" return self.__download_file_from_url(url, output_filepath=save_path) @staticmethod def __download_file_from_url(url, output_filepath): with requests.get(url, stream=True) as r: r.raise_for_status() with open(output_filepath, 'wb') as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) return output_filepath