mirror of
https://github.com/blw1138/cross-py-builder.git
synced 2025-12-17 08:38:11 +00:00
Another attempt #2
This commit is contained in:
339
cross_py_builder/agent_manager.py
Executable file
339
cross_py_builder/agent_manager.py
Executable file
@@ -0,0 +1,339 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import argparse
|
||||
import concurrent.futures
|
||||
import os
|
||||
import socket
|
||||
import tempfile
|
||||
import time
|
||||
import zipfile
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import requests
|
||||
from packaging.version import Version
|
||||
from tabulate import tabulate
|
||||
|
||||
from build_agent import build_agent_version
|
||||
from zeroconf_server import ZeroconfServer
|
||||
|
||||
DEFAULT_PORT = 9001
|
||||
TIMEOUT = 10
|
||||
|
||||
def find_server_ips():
|
||||
ZeroconfServer.configure("_crosspybuilder._tcp.local.", socket.gethostname(), DEFAULT_PORT)
|
||||
hostnames = []
|
||||
try:
|
||||
ZeroconfServer.start(listen_only=True)
|
||||
time.sleep(.3) # give it time to find network
|
||||
hostnames = ZeroconfServer.found_ip_addresses()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
ZeroconfServer.stop()
|
||||
|
||||
# get known hosts
|
||||
with open("../known_hosts", "r") as file:
|
||||
lines = file.readlines()
|
||||
hostnames.extend(lines)
|
||||
|
||||
return hostnames
|
||||
|
||||
|
||||
def get_all_servers_status():
|
||||
table_data = []
|
||||
server_ips = find_server_ips()
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
futures = []
|
||||
for server in server_ips:
|
||||
ip = server.split(':')[0]
|
||||
port = server.split(":")[-1].strip() if ":" in server else DEFAULT_PORT
|
||||
futures.append(executor.submit(get_worker_status, ip, port))
|
||||
|
||||
for future in concurrent.futures.as_completed(futures):
|
||||
try:
|
||||
result = future.result() # Get the result of the thread
|
||||
table_data.append(result)
|
||||
except Exception as e:
|
||||
print(f"Error fetching status from server: {e}") # Handle potential errors
|
||||
return table_data
|
||||
|
||||
|
||||
def get_worker_status(hostname, port=DEFAULT_PORT):
|
||||
"""Fetch worker status from the given hostname."""
|
||||
try:
|
||||
response = requests.get(f"http://{hostname}:{port}/status", timeout=TIMEOUT)
|
||||
status = response.json()
|
||||
status['port'] = port
|
||||
if status['hostname'] != hostname and status['ip'] != hostname:
|
||||
status['ip'] = socket.gethostbyname(hostname)
|
||||
return status
|
||||
except requests.exceptions.RequestException as e:
|
||||
return {"hostname": hostname, "port": port, "status": "offline"}
|
||||
|
||||
|
||||
def zip_project(source_dir, output_zip):
|
||||
"""Zips the given directory."""
|
||||
excluded_dirs = {"venv", ".venv", "dist", "build"}
|
||||
with zipfile.ZipFile(output_zip, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
||||
for root, dirs, files in os.walk(source_dir):
|
||||
|
||||
dirs[:] = [d for d in dirs if d.lower() not in excluded_dirs]
|
||||
for file in files:
|
||||
file_path = os.path.join(root, file)
|
||||
zipf.write(file_path, os.path.relpath(file_path, source_dir))
|
||||
|
||||
|
||||
def send_build_request(server_ip, server_port=DEFAULT_PORT, zip_file=None, git_url=None, download_after=True, version=None):
|
||||
|
||||
if zip_file:
|
||||
upload_url = f"http://{server_ip}:{server_port}/upload"
|
||||
print(f"Submitting build request to URL: {upload_url} - Please wait. This may take a few minutes...")
|
||||
with open(zip_file, 'rb') as f:
|
||||
response = requests.post(upload_url, files={"file": f})
|
||||
elif git_url:
|
||||
checkout_url = f"http://{server_ip}:{server_port}/checkout_git"
|
||||
response = requests.post(checkout_url, json={"repo_url": git_url})
|
||||
else:
|
||||
raise ValueError("Missing zip file or git url!")
|
||||
|
||||
if response.status_code == 200:
|
||||
response_data = response.json()
|
||||
print(response_data)
|
||||
print(f"Build successful. ID: {response_data['id']} Hostname: {response_data['hostname']} OS: {response_data['os']} CPU: {response_data['cpu']} Spec files: {len(response_data['spec_files'])} - Elapsed time: {response_data['duration']}" )
|
||||
if download_after:
|
||||
download_url = f"http://{server_ip}:{server_port}/download/{response_data.get('id')}"
|
||||
try:
|
||||
base_name = os.path.splitext(os.path.basename(zip_file))[0]
|
||||
version_string = ("-" + version) if version else ""
|
||||
save_name = f"{base_name}{version_string}-{response_data['os'].lower()}-{response_data['cpu'].lower()}.zip"
|
||||
download_zip(download_url, save_name=save_name)
|
||||
except Exception as e:
|
||||
print(f"Error downloading zip: {e}")
|
||||
else:
|
||||
print("Upload failed:", response.status_code, response.text)
|
||||
|
||||
|
||||
def download_zip(url, save_name, save_dir="."):
|
||||
"""Download a ZIP file from a URL and save it with its original filename."""
|
||||
response = requests.get(url, stream=True)
|
||||
response.raise_for_status() # Raise an error if request fails
|
||||
|
||||
save_path = os.path.join(save_dir, save_name)
|
||||
with open(save_path, "wb") as file:
|
||||
for chunk in response.iter_content(chunk_size=8192): # Download in chunks
|
||||
file.write(chunk)
|
||||
|
||||
print(f"Saved binaries to file: {save_path}")
|
||||
return save_path
|
||||
|
||||
|
||||
def select_server(servers, cpu=None, os_name=None):
|
||||
"""Selects a build server based on CPU architecture and OS filters."""
|
||||
available = [s for s in servers if s["status"] == "ready"]
|
||||
|
||||
if cpu:
|
||||
available = [s for s in available if cpu.lower() in s["cpu"].lower()]
|
||||
if os_name:
|
||||
available = [s for s in available if os_name.lower() in s["os"].lower()]
|
||||
return available[0] if available else None # Return first matching server or None
|
||||
|
||||
|
||||
def process_new_job(args, server_data):
|
||||
available_servers = [s for s in server_data if s["status"] == "ready"]
|
||||
|
||||
if args.cpu:
|
||||
available_servers = [s for s in available_servers if args.cpu.lower() in s["cpu"].lower()]
|
||||
if args.os:
|
||||
available_servers = [s for s in available_servers if args.os.lower() in s["os"].lower()]
|
||||
|
||||
if not available_servers:
|
||||
print("No available servers matching the criteria.")
|
||||
return
|
||||
|
||||
# Keep only unique servers
|
||||
unique_servers = {}
|
||||
for server in available_servers:
|
||||
key = (server["cpu"], server["os"])
|
||||
if key not in unique_servers:
|
||||
unique_servers[key] = server
|
||||
|
||||
available_servers = list(unique_servers.values())
|
||||
print(f"Found {len(available_servers)} servers available to build")
|
||||
print(tabulate(available_servers, headers="keys", tablefmt="grid"))
|
||||
|
||||
zip_file = None
|
||||
if args.build:
|
||||
project_path = args.build
|
||||
tmp_dir = tempfile.gettempdir()
|
||||
zip_file = os.path.join(tmp_dir, f"{os.path.basename(project_path)}.zip")
|
||||
zip_project(project_path, zip_file)
|
||||
print(f"Zipped {project_path} to {zip_file}")
|
||||
|
||||
# Start builds on all matching servers
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
print("Submitting builds to:")
|
||||
download = True
|
||||
for server in available_servers:
|
||||
print(f"\t{server['hostname']} - {server['os']} - {server['cpu']}")
|
||||
futures = {executor.submit(
|
||||
send_build_request,server["ip"], DEFAULT_PORT, zip_file, args.checkout, download, args.version):
|
||||
server for server in available_servers}
|
||||
|
||||
# Collect results
|
||||
for future in concurrent.futures.as_completed(futures):
|
||||
server = futures[future]
|
||||
try:
|
||||
response = future.result() # Get the response
|
||||
except Exception as e:
|
||||
print(f"Build failed on {server['hostname']}: {e}")
|
||||
|
||||
try:
|
||||
if zip_file:
|
||||
os.remove(zip_file)
|
||||
except Exception as e:
|
||||
print(f"Error removing zip file: {e}")
|
||||
|
||||
|
||||
def delete_cache(server_data):
|
||||
available_servers = [s for s in server_data if s["status"] == "ready"]
|
||||
print(f"Deleting cache in from all available servers ({len(available_servers)})")
|
||||
for server in available_servers:
|
||||
try:
|
||||
response = requests.get(f"http://{server['ip']}:{server.get('port', DEFAULT_PORT)}/delete_cache")
|
||||
response.raise_for_status()
|
||||
print(f"Cache cleared on {server['hostname']}")
|
||||
except Exception as e:
|
||||
print(f"Error deleting cache on {server['hostname']}: {e}")
|
||||
|
||||
|
||||
def update_worker(server):
|
||||
try:
|
||||
print(f"Updating {server['hostname']} from {server.get('agent_version')} => {build_agent_version}")
|
||||
|
||||
with open("build_agent.py", "rb") as file1, open("../requirements.txt", "rb") as file2:
|
||||
update_files = {
|
||||
"file1": open("build_agent.py", "rb"),
|
||||
"file2": open("../requirements.txt", "rb")
|
||||
}
|
||||
response = requests.post(f"http://{server['ip']}:{server.get('port', DEFAULT_PORT)}/update",
|
||||
files=update_files)
|
||||
response.raise_for_status()
|
||||
|
||||
response_json = response.json()
|
||||
if response_json.get('updated_files') and not response_json.get('error_files'):
|
||||
try:
|
||||
requests.get(f"http://{server['ip']}:{server.get('port', DEFAULT_PORT)}/restart", timeout=TIMEOUT)
|
||||
except requests.exceptions.ConnectionError:
|
||||
pass
|
||||
return server
|
||||
else:
|
||||
print(f"Error updating {server['hostname']}. Errors: {response_json.get('error_files')} - Updated: {response_json.get('updated_files')}")
|
||||
except Exception as e:
|
||||
print(f"Unhandled error updating {server['hostname']}: {e}")
|
||||
return None
|
||||
|
||||
def update_build_workers(server_data):
|
||||
available_workers = [s for s in server_data if s["status"] == "ready"]
|
||||
workers_to_update = [x for x in available_workers if Version(x.get('agent_version')) < Version(build_agent_version)]
|
||||
|
||||
if not workers_to_update:
|
||||
print(f"All {len(available_workers)} workers up to date")
|
||||
return
|
||||
|
||||
print(f"Updating workers on all available servers ({len(workers_to_update)}) to {build_agent_version}")
|
||||
updated_servers = []
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
futures = {executor.submit(update_worker, server): server for server in workers_to_update}
|
||||
for future in concurrent.futures.as_completed(futures):
|
||||
server = future.result()
|
||||
if server:
|
||||
updated_servers.append(server)
|
||||
|
||||
if updated_servers:
|
||||
print("Waiting for servers to restart...")
|
||||
unverified_servers = {server["ip"]: server for server in updated_servers}
|
||||
end_time = datetime.now() + timedelta(seconds=30)
|
||||
while unverified_servers and datetime.now() < end_time:
|
||||
for server_ip in list(unverified_servers.keys()): # Iterate over a copy to avoid modification issues
|
||||
try:
|
||||
response = requests.get(f"http://{server_ip}:{server.get('port', DEFAULT_PORT)}/status")
|
||||
response.raise_for_status()
|
||||
server_info = unverified_servers[server_ip] # Get full server details
|
||||
agent_version = response.json().get('agent_version')
|
||||
if agent_version == build_agent_version:
|
||||
print(f"Agent on {server_info['hostname']} successfully upgraded to {build_agent_version}")
|
||||
else:
|
||||
print(f"Agent on {server_info['hostname']} failed to upgrade. Still on version {agent_version}")
|
||||
unverified_servers.pop(server_ip)
|
||||
except requests.exceptions.ConnectionError:
|
||||
pass # Server is still restarting
|
||||
if unverified_servers:
|
||||
time.sleep(1) # Short delay before retrying to avoid spamming requests
|
||||
if unverified_servers:
|
||||
print("Some servers did not restart in time:", list(unverified_servers.keys()))
|
||||
|
||||
print("Update complete")
|
||||
|
||||
|
||||
def shutdown_agent(hostname):
|
||||
print(f"Shutting down hostname: {hostname}")
|
||||
try:
|
||||
requests.get(f"http://{hostname}:{DEFAULT_PORT}/shutdown", timeout=TIMEOUT)
|
||||
except (requests.exceptions.ConnectionError, TimeoutError):
|
||||
pass
|
||||
|
||||
|
||||
def restart_agent(hostname):
|
||||
print(f"Restarting agent: {hostname}")
|
||||
try:
|
||||
requests.get(f"http://{hostname}:{DEFAULT_PORT}/restart", timeout=TIMEOUT)
|
||||
except (requests.exceptions.ConnectionError, TimeoutError):
|
||||
pass
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Build agent manager for cross_py_builder")
|
||||
parser.add_argument("--status", action="store_true", help="Get status of available servers")
|
||||
parser.add_argument("--build", type=str, help="Path to the project to build")
|
||||
parser.add_argument("--checkout", type=str, help="Url to Git repo for checkout")
|
||||
parser.add_argument("-cpu", type=str, help="CPU architecture")
|
||||
parser.add_argument("-os", type=str, help="Operating system")
|
||||
parser.add_argument("-version", type=str, help="Version number for build")
|
||||
parser.add_argument("--delete-cache", action="store_true", help="Delete cache")
|
||||
parser.add_argument("--update-all", action="store_true", help="Update build agent")
|
||||
parser.add_argument("--restart", type=str, help="Hostname to restart")
|
||||
parser.add_argument("--restart-all", action="store_true", help="Restart all agents")
|
||||
parser.add_argument("--shutdown", type=str, help="Hostname to shutdown")
|
||||
parser.add_argument("--shutdown-all", action="store_true", help="Shutdown all agents")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.status:
|
||||
server_data = get_all_servers_status()
|
||||
server_data = [x for x in server_data if x['status'] != 'offline']
|
||||
print(tabulate(server_data, headers="keys", tablefmt="grid"))
|
||||
return
|
||||
elif args.restart:
|
||||
restart_agent(args.restart)
|
||||
elif args.restart_all:
|
||||
print("Restarting all agents...")
|
||||
for server_ip in find_server_ips():
|
||||
restart_agent(server_ip)
|
||||
elif args.shutdown:
|
||||
shutdown_agent(args.shutdown)
|
||||
elif args.shutdown_all:
|
||||
print("Shutting down all agents...")
|
||||
for server_ip in find_server_ips():
|
||||
shutdown_agent(server_ip)
|
||||
elif args.delete_cache:
|
||||
delete_cache(get_all_servers_status())
|
||||
elif args.build or args.checkout:
|
||||
process_new_job(args, get_all_servers_status())
|
||||
elif args.update_all:
|
||||
update_build_workers(get_all_servers_status())
|
||||
else:
|
||||
print("No path given!")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user