Files
cross-py-builder/agent_manager.py
Brett Williams 0f487640bd Argparse cleanup
2025-03-06 10:07:19 -06:00

305 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import concurrent.futures
import os
import socket
import tempfile
import time
import zipfile
from datetime import datetime, timedelta
from packaging.version import Version
from tabulate import tabulate
import requests
from build_agent import build_agent_version
from zeroconf_server import ZeroconfServer
TIMEOUT = 10
def find_server_ips():
ZeroconfServer.configure("_crosspybuilder._tcp.local.", socket.gethostname(), 9001)
hostnames = []
try:
ZeroconfServer.start(listen_only=True)
time.sleep(.3) # give it time to find network
hostnames = ZeroconfServer.found_ip_addresses()
except KeyboardInterrupt:
pass
finally:
ZeroconfServer.stop()
# get known hosts
with open("known_hosts", "r") as file:
lines = file.readlines()
lines = [line.split(':')[0].strip() for line in lines]
hostnames.extend(lines)
return hostnames
def get_all_servers_status():
table_data = []
server_ips = find_server_ips()
with concurrent.futures.ThreadPoolExecutor() as executor:
results = executor.map(get_worker_status, server_ips) # Runs in parallel
table_data.extend(results)
return table_data
def get_worker_status(hostname):
"""Fetch worker status from the given hostname."""
try:
response = requests.get(f"http://{hostname}:9001/status", timeout=TIMEOUT)
status = response.json()
if status['hostname'] != hostname and status['ip'] != hostname:
status['ip'] = socket.gethostbyname(hostname)
return status
except requests.exceptions.RequestException as e:
print(f"EXCEPTION: {e}")
return {"hostname": hostname, "status": "offline"}
def zip_project(source_dir, output_zip):
"""Zips the given directory."""
excluded_dirs = {"venv", ".venv", "dist", "build"}
with zipfile.ZipFile(output_zip, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, dirs, files in os.walk(source_dir):
dirs[:] = [d for d in dirs if d.lower() not in excluded_dirs]
for file in files:
file_path = os.path.join(root, file)
zipf.write(file_path, os.path.relpath(file_path, source_dir))
def send_build_request(zip_file, server_ip, download_after=False):
"""Uploads the zip file to the given server."""
upload_url = f"http://{server_ip}:9001/upload"
print(f"Submitting build request to URL: {upload_url} - Please wait. This may take a few minutes...")
with open(zip_file, 'rb') as f:
response = requests.post(upload_url, files={"file": f})
if response.status_code == 200:
response_data = response.json()
print(f"Build successful. ID: {response_data['id']} Hostname: {response_data['hostname']} OS: {response_data['os']} CPU: {response_data['cpu']} Spec files: {len(response_data['spec_files'])} - Elapsed time: {response_data['duration']}" )
if download_after:
download_url = f"http://{server_ip}:9001/download/{response_data.get('id')}"
try:
save_name = f"{os.path.splitext(os.path.basename(zip_file))[0]}-{response_data['os'].lower()}-{response_data['cpu'].lower()}.zip"
download_zip(download_url, save_name=save_name)
except Exception as e:
print(f"Error downloading zip: {e}")
else:
print("Upload failed:", response.status_code, response.text)
def download_zip(url, save_name, save_dir="."):
"""Download a ZIP file from a URL and save it with its original filename."""
response = requests.get(url, stream=True)
response.raise_for_status() # Raise an error if request fails
save_path = os.path.join(save_dir, save_name)
with open(save_path, "wb") as file:
for chunk in response.iter_content(chunk_size=8192): # Download in chunks
file.write(chunk)
print(f"Saved binaries to file: {save_path}")
return save_path
def select_server(servers, cpu=None, os_name=None):
"""Selects a build server based on CPU architecture and OS filters."""
available = [s for s in servers if s["status"] == "ready"]
if cpu:
available = [s for s in available if cpu.lower() in s["cpu"].lower()]
if os_name:
available = [s for s in available if os_name.lower() in s["os"].lower()]
return available[0] if available else None # Return first matching server or None
def process_new_job(args, server_data):
available_servers = [s for s in server_data if s["status"] == "ready"]
if args.cpu:
available_servers = [s for s in available_servers if args.cpu.lower() in s["cpu"].lower()]
if args.os:
available_servers = [s for s in available_servers if args.os.lower() in s["os"].lower()]
if not available_servers:
print("No available servers matching the criteria.")
return
# Keep only unique servers
unique_servers = {}
for server in available_servers:
key = (server["cpu"], server["os"])
if key not in unique_servers:
unique_servers[key] = server
available_servers = list(unique_servers.values())
print(f"Found {len(available_servers)} servers available to build")
print(tabulate(available_servers, headers="keys", tablefmt="grid"))
project_path = args.build
tmp_dir = tempfile.gettempdir()
zip_file = os.path.join(tmp_dir, f"{os.path.basename(project_path)}.zip")
zip_project(project_path, zip_file)
print(f"Zipped {project_path} to {zip_file}")
# Start builds on all matching servers
with concurrent.futures.ThreadPoolExecutor() as executor:
print("Submitting builds to:")
for server in available_servers:
print(f"\t{server['hostname']} - {server['os']} - {server['cpu']}")
futures = {executor.submit(send_build_request, zip_file, server["ip"], args.download): server for server in
available_servers}
# Collect results
for future in concurrent.futures.as_completed(futures):
server = futures[future]
try:
response = future.result() # Get the response
except Exception as e:
print(f"Build failed on {server['hostname']}: {e}")
try:
os.remove(zip_file)
except Exception as e:
print(f"Error removing zip file: {e}")
def delete_cache(server_data):
available_servers = [s for s in server_data if s["status"] == "ready"]
print(f"Deleting cache in from all available servers ({len(available_servers)})")
for server in available_servers:
try:
response = requests.get(f"http://{server['ip']}:9001/delete_cache")
response.raise_for_status()
print(f"Cache cleared on {server['hostname']}")
except Exception as e:
print(f"Error deleting cache on {server['hostname']}: {e}")
def update_worker(server):
try:
print(f"Updating {server['hostname']} from {server.get('agent_version')} => {build_agent_version}")
with open("build_agent.py", "rb") as file1, open("requirements.txt", "rb") as file2:
update_files = {
"file1": open("build_agent.py", "rb"),
"file2": open("requirements.txt", "rb")
}
response = requests.post(f"http://{server['ip']}:9001/update", files=update_files)
response.raise_for_status()
response_json = response.json()
if response_json.get('updated_files') and not response_json.get('error_files'):
try:
requests.get(f"http://{server['ip']}:9001/restart", timeout=TIMEOUT)
except requests.exceptions.ConnectionError:
pass
return server
else:
print(f"Error updating {server['hostname']}. Errors: {response_json.get('error_files')} - Updated: {response_json.get('updated_files')}")
except Exception as e:
print(f"Unhandled error updating {server['hostname']}: {e}")
return None
def update_build_workers(server_data):
available_workers = [s for s in server_data if s["status"] == "ready"]
workers_to_update = [x for x in available_workers if Version(x.get('agent_version')) < Version(build_agent_version)]
if not workers_to_update:
print(f"All {len(available_workers)} workers up to date")
return
print(f"Updating workers on all available servers ({len(workers_to_update)}) to {build_agent_version}")
updated_servers = []
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {executor.submit(update_worker, server): server for server in workers_to_update}
for future in concurrent.futures.as_completed(futures):
server = future.result()
if server:
updated_servers.append(server)
if updated_servers:
print("Waiting for servers to restart...")
unverified_servers = {server["ip"]: server for server in updated_servers}
end_time = datetime.now() + timedelta(seconds=30)
while unverified_servers and datetime.now() < end_time:
for server_ip in list(unverified_servers.keys()): # Iterate over a copy to avoid modification issues
try:
response = requests.get(f"http://{server_ip}:9001/status")
response.raise_for_status()
server_info = unverified_servers[server_ip] # Get full server details
agent_version = response.json().get('agent_version')
if agent_version == build_agent_version:
print(f"Agent on {server_info['hostname']} successfully upgraded to {build_agent_version}")
else:
print(f"Agent on {server_info['hostname']} failed to upgrade. Still on version {agent_version}")
unverified_servers.pop(server_ip)
except requests.exceptions.ConnectionError:
pass # Server is still restarting
if unverified_servers:
time.sleep(1) # Short delay before retrying to avoid spamming requests
if unverified_servers:
print("Some servers did not restart in time:", list(unverified_servers.keys()))
print("Update complete")
def main():
parser = argparse.ArgumentParser(description="Build agent manager for cross-py-builder")
parser.add_argument("--status", action="store_true", help="Get status of available servers")
parser.add_argument("--build", type=str, help="Path to the project to build")
parser.add_argument("-cpu", type=str, help="CPU architecture")
parser.add_argument("-os", type=str, help="Operating system")
parser.add_argument("-d", '--download', action="store_true", help="Download after build")
parser.add_argument("--delete-cache", action="store_true", help="Delete cache")
parser.add_argument("--update-all", action="store_true", help="Update build agent")
parser.add_argument("--restart", type=str, help="Hostname to restart")
parser.add_argument("--restart-all", action="store_true", help="Restart all agents")
parser.add_argument("--shutdown", type=str, help="Hostname to shutdown")
parser.add_argument("--shutdown-all", action="store_true", help="Restart all agents")
args = parser.parse_args()
if args.status:
print(tabulate(get_all_servers_status(), headers="keys", tablefmt="grid"))
return
elif args.restart:
restart_agent(args.restart)
elif args.restart_all:
print("Restarting all agents...")
for server_ip in find_server_ips():
restart_agent(server_ip)
elif args.shutdown:
shutdown_agent(args.shutdown)
elif args.shutdown_all:
print("Shutting down all agents...")
for server_ip in find_server_ips():
shutdown_agent(server_ip)
elif args.delete_cache:
delete_cache(get_all_servers_status())
elif args.build:
process_new_job(args, get_all_servers_status())
elif args.update_all:
update_build_workers(get_all_servers_status())
else:
print("No path given!")
def shutdown_agent(hostname):
print(f"Shutting down hostname: {hostname}")
try:
requests.get(f"http://{hostname}:9001/shutdown", timeout=TIMEOUT)
except (requests.exceptions.ConnectionError, TimeoutError):
pass
def restart_agent(hostname):
print(f"Restarting agent: {hostname}")
try:
requests.get(f"http://{hostname}:9001/restart", timeout=TIMEOUT)
except (requests.exceptions.ConnectionError, TimeoutError):
pass
if __name__ == "__main__":
main()