diff --git a/.gitignore b/.gitignore index f3206bb..743890c 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,4 @@ *.pyc /server_state.json /.scheduler_prefs -/database.db +*.db diff --git a/lib/client/dashboard_window.py b/lib/client/dashboard_window.py index 4e8519a..23dafd4 100644 --- a/lib/client/dashboard_window.py +++ b/lib/client/dashboard_window.py @@ -38,9 +38,8 @@ class DashboardWindow: # Create a Treeview widget self.root = tk.Tk() self.root.title("Zordon Dashboard") - self.local_host = socket.gethostname() - self.server_proxy = RenderServerProxy(hostname=self.local_host) - self.job_cache = [] + self.current_hostname = None + self.server_proxies = {} self.added_hostnames = [] # Setup zeroconf @@ -62,17 +61,20 @@ class DashboardWindow: left_frame.pack(side=tk.LEFT, expand=True, fill=tk.BOTH) self.server_tree = ttk.Treeview(left_frame, show="headings") self.server_tree.pack(expand=True, fill=tk.BOTH) - self.server_tree["columns"] = ("Server") - self.server_tree.column("Server", width=50) + self.server_tree["columns"] = ("Server", "Status") self.server_tree.bind("<>", self.server_picked) self.server_tree.column("Server", width=200) + self.server_tree.column("Status", width=80) left_button_frame = tk.Frame(left_frame) - left_button_frame.pack(side=tk.BOTTOM, fill=tk.X, expand=False) + left_button_frame.pack(side=tk.BOTTOM, fill=tk.X, padx=5, pady=5, expand=False) # Create buttons - add_server_button = tk.Button(left_button_frame, text="Add Server", command=self.add_server_button) - add_server_button.pack(side=tk.RIGHT, padx=5, pady=5) + self.remove_server_button = tk.Button(left_button_frame, text="-", command=self.remove_server_button) + self.remove_server_button.pack(side=tk.RIGHT) + self.remove_server_button.config(state='disabled') + add_server_button = tk.Button(left_button_frame, text="+", command=self.add_server_button) + add_server_button.pack(side=tk.RIGHT) # Create separator separator = ttk.Separator(server_frame, orient=tk.VERTICAL) @@ -81,7 +83,7 @@ class DashboardWindow: # Setup the Tree self.job_tree = ttk.Treeview(server_frame, show="headings") self.job_tree.tag_configure('running', background='lawn green', font=('', 0, 'bold')) - self.job_tree.bind("<>", self.on_row_select) + self.job_tree.bind("<>", self.job_picked) self.job_tree["columns"] = ("id", "Name", "Renderer", "Priority", "Status", "Time Elapsed", "Frames", "Date Added", "Owner", "") @@ -131,19 +133,21 @@ class DashboardWindow: make_sortable(self.job_tree) make_sortable(self.server_tree) - # update jobs - self.update_jobs() - try: - selected_job = self.job_tree.get_children()[0] - self.job_tree.selection_set(selected_job) - except IndexError: - pass - # update servers self.update_servers() try: selected_server = self.server_tree.get_children()[0] self.server_tree.selection_set(selected_server) + self.server_picked() + except IndexError: + pass + + # update jobs + self.update_jobs() + try: + selected_job = self.job_tree.get_children()[0] + self.job_tree.selection_set(selected_job) + self.job_picked() except IndexError: pass @@ -152,6 +156,19 @@ class DashboardWindow: x.daemon = True x.start() + @property + def current_server_proxy(self): + return self.server_proxies.get(self.current_hostname, None) + + def remove_server_button(self): + new_hostname = self.server_tree.selection()[0] + if new_hostname in self.added_hostnames: + self.added_hostnames.remove(new_hostname) + self.update_servers() + if self.server_tree.get_children(): + self.server_tree.selection_set(self.server_tree.get_children()[0]) + self.server_picked(event=None) + def add_server_button(self): hostname = simpledialog.askstring("Server Hostname", "Enter the server hostname to add:") if hostname: @@ -159,19 +176,20 @@ class DashboardWindow: if hostname not in self.added_hostnames: if RenderServerProxy(hostname=hostname).connect(): self.added_hostnames.append(hostname) + self.update_servers() else: messagebox.showerror("Cannot Connect", f"Cannot connect to server at hostname: '{hostname}'") - def server_picked(self, event): + def server_picked(self, event=None): try: new_hostname = self.server_tree.selection()[0] - if self.server_proxy.hostname == new_hostname: + self.remove_server_button.config(state="normal" if new_hostname in self.added_hostnames else "disabled") + if self.current_hostname == new_hostname: return - self.server_proxy.hostname = new_hostname + self.current_hostname = new_hostname + self.update_jobs(clear_table=True) except IndexError: pass - self.job_cache.clear() - self.update_jobs(clear_table=True) def selected_job_ids(self): selected_items = self.job_tree.selection() # Get the selected item @@ -182,13 +200,13 @@ class DashboardWindow: def stop_job(self): job_ids = self.selected_job_ids() for job_id in job_ids: - self.server_proxy.request_data(f'job/{job_id}/cancel?confirm=true') + self.current_server_proxy.request_data(f'job/{job_id}/cancel?confirm=true') self.update_jobs(clear_table=True) def delete_job(self): job_ids = self.selected_job_ids() if len(job_ids) == 1: - job = next((d for d in self.job_cache if d.get('id') == job_ids[0]), None) + job = next((d for d in self.current_server_proxy.get_jobs() if d.get('id') == job_ids[0]), None) display_name = job['name'] or os.path.basename(job['input_path']) message = f"Are you sure you want to delete the job:\n{display_name}?" else: @@ -197,8 +215,7 @@ class DashboardWindow: result = messagebox.askyesno("Confirmation", message) if result: for job_id in job_ids: - self.server_proxy.request_data(f'job/{job_id}/delete?confirm=true') - self.job_cache.clear() + self.current_server_proxy.request_data(f'job/{job_id}/delete?confirm=true') self.update_jobs(clear_table=True) def set_image(self, image): @@ -207,22 +224,24 @@ class DashboardWindow: self.photo_label.configure(image=thumb_image) self.photo_label.image = thumb_image - def on_row_select(self, event): + def job_picked(self, event=None): job_id = self.selected_job_ids()[0] if self.selected_job_ids() else None if job_id: # update thumb def fetch_preview(): - hostname = self.server_proxy.hostname - response = self.server_proxy.request(f'job/{job_id}/thumbnail?size=big') - if response.ok: - try: + try: + before_fetch_hostname = self.current_server_proxy.hostname + response = self.current_server_proxy.request(f'job/{job_id}/thumbnail?size=big') + if response.ok: import io image_data = response.content image = Image.open(io.BytesIO(image_data)) - if self.server_proxy.hostname == hostname and job_id == self.selected_job_ids()[0]: + if self.current_server_proxy.hostname == before_fetch_hostname and job_id == self.selected_job_ids()[0]: self.set_image(image) - except Exception as e: - logger.error(f"error fetching image: {e}") + except ConnectionError as e: + logger.error(f"Connection error fetching image: {e}") + except Exception as e: + logger.error(f"Error fetching image: {e}") fetch_thread = threading.Thread(target=fetch_preview) fetch_thread.daemon = True @@ -231,7 +250,7 @@ class DashboardWindow: self.set_image(self.default_image) # update button status - job = next((d for d in self.job_cache if d.get('id') == job_id), None) + job = next((d for d in self.current_server_proxy.get_jobs() if d.get('id') == job_id), None) stop_button_state = 'normal' if job and job['status'] == 'running' else 'disabled' self.stop_button.config(state=stop_button_state) @@ -243,7 +262,7 @@ class DashboardWindow: def show_files(self): output_path = None if self.selected_job_ids(): - job = next((d for d in self.job_cache if d.get('id') == self.selected_job_ids()[0]), None) + job = next((d for d in self.current_server_proxy.get_jobs() if d.get('id') == self.selected_job_ids()[0]), None) output_path = os.path.dirname(job['output_path']) # check local filesystem if not os.path.exists(output_path): output_path = file_exists_in_mounts(output_path) # check any attached network shares @@ -254,7 +273,7 @@ class DashboardWindow: def open_logs(self): if self.selected_job_ids(): - url = f'http://{self.server_proxy.hostname}:{self.server_proxy.port}/api/job/{self.selected_job_ids()[0]}/logs' + url = f'http://{self.current_server_proxy.hostname}:{self.current_server_proxy.port}/api/job/{self.selected_job_ids()[0]}/logs' launch_url(url) def mainloop(self): @@ -262,64 +281,90 @@ class DashboardWindow: def __background_update(self): while True: - self.update_jobs() self.update_servers() - time.sleep(3) + self.update_jobs() + time.sleep(1) def update_servers(self): - servers = list(set(self.zeroconf.found_clients() + self.added_hostnames)) - if len(servers) < len(self.server_tree.get_children()): - self.server_tree.delete(*self.server_tree.get_children()) - for hostname in servers: + + def update_row(tree, id, new_values, tags=None): + for item in tree.get_children(): + values = tree.item(item, "values") + if values[0] == id: + if tags: + tree.item(item, values=new_values, tags=tags) + else: + tree.item(item, values=new_values) + break + + current_servers = list(set(self.zeroconf.found_clients() + self.added_hostnames)) + for hostname in current_servers: + if not self.server_proxies.get(hostname, None): + new_proxy = RenderServerProxy(hostname=hostname) + new_proxy.start_background_update() + self.server_proxies[hostname] = new_proxy + + for hostname, proxy in self.server_proxies.items(): if hostname not in self.server_tree.get_children(): - self.server_tree.insert("", tk.END, iid=hostname, values=(hostname,)) + self.server_tree.insert("", tk.END, iid=hostname, values=(hostname, proxy.status(), )) + else: + update_row(self.server_tree, hostname, new_values=(hostname, proxy.status())) + + # remove any servers that don't belong + for row in self.server_tree.get_children(): + if row not in current_servers: + self.server_tree.delete(row) + proxy = self.server_proxies.get(row, None) + if proxy: + proxy.stop_background_update() + self.server_proxies.pop(row) def update_jobs(self, clear_table=False): - def update_jobs_inner(): - def update_row(tree, id, new_values, tags=None): - for item in tree.get_children(): - values = tree.item(item, "values") - if values[0] == id: - tree.item(item, values=new_values, tags=tags) - break + if not self.current_server_proxy: + return - hostname = self.server_proxy.hostname - job_fetch = self.server_proxy.get_jobs(ignore_token=clear_table) - # have to check hostname is still valid because of delay in fetching jobs - if hostname == self.server_proxy.hostname: - if job_fetch is not None: - if len(job_fetch) < len(self.job_cache) or len(job_fetch) == 0: - self.job_tree.delete(*self.job_tree.get_children()) - self.job_cache = job_fetch # update the cache only if its good data - for job in self.job_cache: - display_status = job['status'] if job['status'] != 'running' else \ - ('%.0f%%' % (job['percent_complete'] * 100)) # if running, show percent, otherwise just show status - tags = (job['status'],) - values = (job['id'], - job['name'] or os.path.basename(job['input_path']), - job['renderer'] + "-" + job['renderer_version'], - job['priority'], - display_status, - get_time_elapsed(datetime.datetime.fromisoformat(job['start_time']), - datetime.datetime.fromisoformat(job['end_time'])), - job['total_frames'], - job['date_created'], - job['owner']) - try: - if self.job_tree.exists(job['id']): - update_row(self.job_tree, job['id'], new_values=values, tags=tags) - else: - self.job_tree.insert("", tk.END, iid=job['id'], values=values, tags=tags) - except tk.TclError: - pass + def update_row(tree, id, new_values, tags=None): + for item in tree.get_children(): + values = tree.item(item, "values") + if values[0] == id: + tree.item(item, values=new_values, tags=tags) + break if clear_table: self.job_tree.delete(*self.job_tree.get_children()) - x = threading.Thread(target=update_jobs_inner) - x.daemon = True - x.start() + job_fetch = self.current_server_proxy.get_jobs(ignore_token=clear_table) + if job_fetch: + for job in job_fetch: + display_status = job['status'] if job['status'] != 'running' else \ + ('%.0f%%' % (job['percent_complete'] * 100)) # if running, show percent, otherwise just show status + tags = (job['status'],) + start_time = datetime.datetime.fromisoformat(job['start_time']) if job['start_time'] else None + end_time = datetime.datetime.fromisoformat(job['end_time']) if job['end_time'] else None + + values = (job['id'], + job['name'] or os.path.basename(job['input_path']), + job['renderer'] + "-" + job['renderer_version'], + job['priority'], + display_status, + get_time_elapsed(start_time, end_time), + job['total_frames'], + job['date_created'], + job['owner']) + try: + if self.job_tree.exists(job['id']): + update_row(self.job_tree, job['id'], new_values=values, tags=tags) + else: + self.job_tree.insert("", tk.END, iid=job['id'], values=values, tags=tags) + except tk.TclError: + pass + + # remove any jobs that don't belong + all_job_ids = [job['id'] for job in job_fetch] + for row in self.job_tree.get_children(): + if row not in all_job_ids: + self.job_tree.delete(row) def show_new_job_window(self): new_window = tk.Toplevel(self.root) diff --git a/lib/server/server_proxy.py b/lib/server/server_proxy.py index 4290f09..051fa8a 100644 --- a/lib/server/server_proxy.py +++ b/lib/server/server_proxy.py @@ -2,6 +2,8 @@ import logging import os import json import requests +import time +import threading from lib.render_workers.base_worker import RenderStatus from requests_toolbelt.multipart import MultipartEncoder, MultipartEncoderMonitor @@ -13,15 +15,21 @@ categories = [RenderStatus.RUNNING, RenderStatus.ERROR, RenderStatus.NOT_STARTED RenderStatus.COMPLETED, RenderStatus.CANCELLED, RenderStatus.UNDEFINED] logger = logging.getLogger() +OFFLINE_MAX = 2 class RenderServerProxy: - def __init__(self, hostname=None, server_port="8080"): + def __init__(self, hostname, server_port="8080"): self._hostname = hostname self.port = server_port self.fetched_status_data = None self.__jobs_cache_token = None + self.__jobs_cache = [] + self.__update_in_background = False + self.__background_thread = None + self.__offline_flags = 0 + self.update_cadence = 5 @property def hostname(self): @@ -36,15 +44,29 @@ class RenderServerProxy: status = self.request_data('status') return status + def is_online(self): + if self.__update_in_background: + return self.__offline_flags < OFFLINE_MAX + else: + return self.connect() is not None + + def status(self): + if not self.is_online(): + return "Offline" + running_jobs = [x for x in self.__jobs_cache if x['status'] == 'running'] if self.__jobs_cache else [] + return f"{len(running_jobs)} running" if running_jobs else "Available" + def request_data(self, payload, timeout=5): try: req = self.request(payload, timeout) if req.ok and req.status_code == 200: + self.__offline_flags = 0 return req.json() except json.JSONDecodeError as e: logger.debug(f"JSON decode error: {e}") except requests.ConnectionError as e: logger.error(f"Connection error: {e}") + self.__offline_flags = self.__offline_flags + 1 except Exception as e: logger.exception(f"Uncaught exception: {e}") return None @@ -52,19 +74,37 @@ class RenderServerProxy: def request(self, payload, timeout=5): return requests.get(f'http://{self.hostname}:{self.port}/api/{payload}', timeout=timeout) + def start_background_update(self): + self.__update_in_background = True + + def thread_worker(): + while self.__update_in_background: + self.__update_job_cache() + time.sleep(self.update_cadence) + + self.__background_thread = threading.Thread(target=thread_worker) + self.__background_thread.daemon = True + self.__background_thread.start() + + def stop_background_update(self): + self.__update_in_background = False + def get_jobs(self, timeout=5, ignore_token=False): + if not self.__update_in_background: + self.__update_job_cache(timeout, ignore_token) + return self.__jobs_cache.copy() if self.__jobs_cache else None + + def __update_job_cache(self, timeout=5, ignore_token=False): url = f'jobs?token={self.__jobs_cache_token}' if self.__jobs_cache_token and not ignore_token else 'jobs' status_result = self.request_data(url, timeout=timeout) - all_jobs = None if status_result is not None: sorted_jobs = [] for status_category in categories: found_jobs = [x for x in status_result['jobs'] if x['status'] == status_category.value] if found_jobs: sorted_jobs.extend(found_jobs) - all_jobs = sorted_jobs + self.__jobs_cache = sorted_jobs self.__jobs_cache_token = status_result['token'] - return all_jobs def get_data(self, timeout=5): all_data = self.request_data('full_status', timeout=timeout) diff --git a/requirements.txt b/requirements.txt index 633b60e..0952de9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,13 +3,14 @@ requests==2.31.0 psutil==5.9.5 PyYAML~=6.0 Flask==2.3.2 -rich==13.3.5 +rich==13.4.1 ffmpeg-python -Werkzeug==2.3.4 +Werkzeug==2.3.5 tkinterdnd2~=0.3.0 future==0.18.3 json2html~=1.3.0 SQLAlchemy~=2.0.15 -Pillow~=9.3.0 -zeroconf~=0.63.0 -requests-toolbelt~=1.0 \ No newline at end of file +Pillow==9.5.0 +zeroconf==0.64.1 +requests-toolbelt~=1.0 +arrow~=1.2.3 \ No newline at end of file