mirror of
https://github.com/blw1138/cross-py-builder.git
synced 2025-12-17 08:38:11 +00:00
305 lines
12 KiB
Python
Executable File
305 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import concurrent.futures
|
|
import os
|
|
import socket
|
|
import tempfile
|
|
import time
|
|
import zipfile
|
|
from datetime import datetime, timedelta
|
|
|
|
from packaging.version import Version
|
|
from tabulate import tabulate
|
|
|
|
import requests
|
|
|
|
from build_agent import build_agent_version
|
|
from zeroconf_server import ZeroconfServer
|
|
|
|
TIMEOUT = 10
|
|
|
|
def find_server_ips():
|
|
ZeroconfServer.configure("_crosspybuilder._tcp.local.", socket.gethostname(), 9001)
|
|
hostnames = []
|
|
try:
|
|
ZeroconfServer.start(listen_only=True)
|
|
time.sleep(.3) # give it time to find network
|
|
hostnames = ZeroconfServer.found_ip_addresses()
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
ZeroconfServer.stop()
|
|
|
|
# get known hosts
|
|
with open("known_hosts", "r") as file:
|
|
lines = file.readlines()
|
|
|
|
lines = [line.split(':')[0].strip() for line in lines]
|
|
hostnames.extend(lines)
|
|
|
|
return hostnames
|
|
|
|
def get_all_servers_status():
|
|
table_data = []
|
|
server_ips = find_server_ips()
|
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
|
results = executor.map(get_worker_status, server_ips) # Runs in parallel
|
|
table_data.extend(results)
|
|
return table_data
|
|
|
|
def get_worker_status(hostname):
|
|
"""Fetch worker status from the given hostname."""
|
|
try:
|
|
response = requests.get(f"http://{hostname}:9001/status", timeout=TIMEOUT)
|
|
status = response.json()
|
|
if status['hostname'] != hostname and status['ip'] != hostname:
|
|
status['ip'] = socket.gethostbyname(hostname)
|
|
return status
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"EXCEPTION: {e}")
|
|
return {"hostname": hostname, "status": "offline"}
|
|
|
|
|
|
def zip_project(source_dir, output_zip):
|
|
"""Zips the given directory."""
|
|
excluded_dirs = {"venv", ".venv", "dist", "build"}
|
|
with zipfile.ZipFile(output_zip, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
|
for root, dirs, files in os.walk(source_dir):
|
|
|
|
dirs[:] = [d for d in dirs if d.lower() not in excluded_dirs]
|
|
for file in files:
|
|
file_path = os.path.join(root, file)
|
|
zipf.write(file_path, os.path.relpath(file_path, source_dir))
|
|
|
|
|
|
def send_build_request(zip_file, server_ip, download_after=False):
|
|
"""Uploads the zip file to the given server."""
|
|
upload_url = f"http://{server_ip}:9001/upload"
|
|
print(f"Submitting build request to URL: {upload_url} - Please wait. This may take a few minutes...")
|
|
with open(zip_file, 'rb') as f:
|
|
response = requests.post(upload_url, files={"file": f})
|
|
|
|
if response.status_code == 200:
|
|
response_data = response.json()
|
|
print(f"Build successful. ID: {response_data['id']} Hostname: {response_data['hostname']} OS: {response_data['os']} CPU: {response_data['cpu']} Spec files: {len(response_data['spec_files'])} - Elapsed time: {response_data['duration']}" )
|
|
if download_after:
|
|
download_url = f"http://{server_ip}:9001/download/{response_data.get('id')}"
|
|
try:
|
|
save_name = f"{os.path.splitext(os.path.basename(zip_file))[0]}-{response_data['os'].lower()}-{response_data['cpu'].lower()}.zip"
|
|
download_zip(download_url, save_name=save_name)
|
|
except Exception as e:
|
|
print(f"Error downloading zip: {e}")
|
|
else:
|
|
print("Upload failed:", response.status_code, response.text)
|
|
|
|
def download_zip(url, save_name, save_dir="."):
|
|
"""Download a ZIP file from a URL and save it with its original filename."""
|
|
response = requests.get(url, stream=True)
|
|
response.raise_for_status() # Raise an error if request fails
|
|
|
|
save_path = os.path.join(save_dir, save_name)
|
|
with open(save_path, "wb") as file:
|
|
for chunk in response.iter_content(chunk_size=8192): # Download in chunks
|
|
file.write(chunk)
|
|
|
|
print(f"Saved binaries to file: {save_path}")
|
|
return save_path
|
|
|
|
|
|
def select_server(servers, cpu=None, os_name=None):
|
|
"""Selects a build server based on CPU architecture and OS filters."""
|
|
available = [s for s in servers if s["status"] == "ready"]
|
|
|
|
if cpu:
|
|
available = [s for s in available if cpu.lower() in s["cpu"].lower()]
|
|
if os_name:
|
|
available = [s for s in available if os_name.lower() in s["os"].lower()]
|
|
return available[0] if available else None # Return first matching server or None
|
|
|
|
def process_new_job(args, server_data):
|
|
available_servers = [s for s in server_data if s["status"] == "ready"]
|
|
|
|
if args.cpu:
|
|
available_servers = [s for s in available_servers if args.cpu.lower() in s["cpu"].lower()]
|
|
if args.os:
|
|
available_servers = [s for s in available_servers if args.os.lower() in s["os"].lower()]
|
|
|
|
if not available_servers:
|
|
print("No available servers matching the criteria.")
|
|
return
|
|
|
|
# Keep only unique servers
|
|
unique_servers = {}
|
|
for server in available_servers:
|
|
key = (server["cpu"], server["os"])
|
|
if key not in unique_servers:
|
|
unique_servers[key] = server
|
|
|
|
available_servers = list(unique_servers.values())
|
|
print(f"Found {len(available_servers)} servers available to build")
|
|
print(tabulate(available_servers, headers="keys", tablefmt="grid"))
|
|
|
|
project_path = args.build
|
|
tmp_dir = tempfile.gettempdir()
|
|
zip_file = os.path.join(tmp_dir, f"{os.path.basename(project_path)}.zip")
|
|
zip_project(project_path, zip_file)
|
|
print(f"Zipped {project_path} to {zip_file}")
|
|
|
|
# Start builds on all matching servers
|
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
|
print("Submitting builds to:")
|
|
for server in available_servers:
|
|
print(f"\t{server['hostname']} - {server['os']} - {server['cpu']}")
|
|
futures = {executor.submit(send_build_request, zip_file, server["ip"], args.download): server for server in
|
|
available_servers}
|
|
|
|
# Collect results
|
|
for future in concurrent.futures.as_completed(futures):
|
|
server = futures[future]
|
|
try:
|
|
response = future.result() # Get the response
|
|
except Exception as e:
|
|
print(f"Build failed on {server['hostname']}: {e}")
|
|
|
|
try:
|
|
os.remove(zip_file)
|
|
except Exception as e:
|
|
print(f"Error removing zip file: {e}")
|
|
|
|
def delete_cache(server_data):
|
|
available_servers = [s for s in server_data if s["status"] == "ready"]
|
|
print(f"Deleting cache in from all available servers ({len(available_servers)})")
|
|
for server in available_servers:
|
|
try:
|
|
response = requests.get(f"http://{server['ip']}:9001/delete_cache")
|
|
response.raise_for_status()
|
|
print(f"Cache cleared on {server['hostname']}")
|
|
except Exception as e:
|
|
print(f"Error deleting cache on {server['hostname']}: {e}")
|
|
|
|
|
|
def update_worker(server):
|
|
try:
|
|
print(f"Updating {server['hostname']} from {server.get('agent_version')} => {build_agent_version}")
|
|
|
|
with open("build_agent.py", "rb") as file1, open("requirements.txt", "rb") as file2:
|
|
update_files = {
|
|
"file1": open("build_agent.py", "rb"),
|
|
"file2": open("requirements.txt", "rb")
|
|
}
|
|
response = requests.post(f"http://{server['ip']}:9001/update", files=update_files)
|
|
response.raise_for_status()
|
|
|
|
response_json = response.json()
|
|
if response_json.get('updated_files') and not response_json.get('error_files'):
|
|
try:
|
|
requests.get(f"http://{server['ip']}:9001/restart", timeout=TIMEOUT)
|
|
except requests.exceptions.ConnectionError:
|
|
pass
|
|
return server
|
|
else:
|
|
print(f"Error updating {server['hostname']}. Errors: {response_json.get('error_files')} - Updated: {response_json.get('updated_files')}")
|
|
except Exception as e:
|
|
print(f"Unhandled error updating {server['hostname']}: {e}")
|
|
return None
|
|
|
|
def update_build_workers(server_data):
|
|
available_workers = [s for s in server_data if s["status"] == "ready"]
|
|
workers_to_update = [x for x in available_workers if Version(x.get('agent_version')) < Version(build_agent_version)]
|
|
|
|
if not workers_to_update:
|
|
print(f"All {len(available_workers)} workers up to date")
|
|
return
|
|
|
|
print(f"Updating workers on all available servers ({len(workers_to_update)}) to {build_agent_version}")
|
|
updated_servers = []
|
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
|
futures = {executor.submit(update_worker, server): server for server in workers_to_update}
|
|
for future in concurrent.futures.as_completed(futures):
|
|
server = future.result()
|
|
if server:
|
|
updated_servers.append(server)
|
|
|
|
if updated_servers:
|
|
print("Waiting for servers to restart...")
|
|
unverified_servers = {server["ip"]: server for server in updated_servers}
|
|
end_time = datetime.now() + timedelta(seconds=30)
|
|
while unverified_servers and datetime.now() < end_time:
|
|
for server_ip in list(unverified_servers.keys()): # Iterate over a copy to avoid modification issues
|
|
try:
|
|
response = requests.get(f"http://{server_ip}:9001/status")
|
|
response.raise_for_status()
|
|
server_info = unverified_servers[server_ip] # Get full server details
|
|
agent_version = response.json().get('agent_version')
|
|
if agent_version == build_agent_version:
|
|
print(f"Agent on {server_info['hostname']} successfully upgraded to {build_agent_version}")
|
|
else:
|
|
print(f"Agent on {server_info['hostname']} failed to upgrade. Still on version {agent_version}")
|
|
unverified_servers.pop(server_ip)
|
|
except requests.exceptions.ConnectionError:
|
|
pass # Server is still restarting
|
|
if unverified_servers:
|
|
time.sleep(1) # Short delay before retrying to avoid spamming requests
|
|
if unverified_servers:
|
|
print("Some servers did not restart in time:", list(unverified_servers.keys()))
|
|
|
|
print("Update complete")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Build agent manager for cross-py-builder")
|
|
parser.add_argument("--status", action="store_true", help="Get status of available servers")
|
|
parser.add_argument("--build", type=str, help="Path to the project to build")
|
|
parser.add_argument("-cpu", type=str, help="CPU architecture")
|
|
parser.add_argument("-os", type=str, help="Operating system")
|
|
parser.add_argument("-d", '--download', action="store_true", help="Download after build")
|
|
parser.add_argument("--delete-cache", action="store_true", help="Delete cache")
|
|
parser.add_argument("--update-all", action="store_true", help="Update build agent")
|
|
parser.add_argument("--restart", type=str, help="Hostname to restart")
|
|
parser.add_argument("--restart-all", action="store_true", help="Restart all agents")
|
|
parser.add_argument("--shutdown", type=str, help="Hostname to shutdown")
|
|
parser.add_argument("--shutdown-all", action="store_true", help="Restart all agents")
|
|
args = parser.parse_args()
|
|
|
|
if args.status:
|
|
print(tabulate(get_all_servers_status(), headers="keys", tablefmt="grid"))
|
|
return
|
|
elif args.restart:
|
|
restart_agent(args.restart)
|
|
elif args.restart_all:
|
|
print("Restarting all agents...")
|
|
for server_ip in find_server_ips():
|
|
restart_agent(server_ip)
|
|
elif args.shutdown:
|
|
shutdown_agent(args.shutdown)
|
|
elif args.shutdown_all:
|
|
print("Shutting down all agents...")
|
|
for server_ip in find_server_ips():
|
|
shutdown_agent(server_ip)
|
|
elif args.delete_cache:
|
|
delete_cache(get_all_servers_status())
|
|
elif args.build:
|
|
process_new_job(args, get_all_servers_status())
|
|
elif args.update_all:
|
|
update_build_workers(get_all_servers_status())
|
|
else:
|
|
print("No path given!")
|
|
|
|
def shutdown_agent(hostname):
|
|
print(f"Shutting down hostname: {hostname}")
|
|
try:
|
|
requests.get(f"http://{hostname}:9001/shutdown", timeout=TIMEOUT)
|
|
except (requests.exceptions.ConnectionError, TimeoutError):
|
|
pass
|
|
|
|
def restart_agent(hostname):
|
|
print(f"Restarting agent: {hostname}")
|
|
try:
|
|
requests.get(f"http://{hostname}:9001/restart", timeout=TIMEOUT)
|
|
except (requests.exceptions.ConnectionError, TimeoutError):
|
|
pass
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |