mirror of
https://github.com/blw1138/cross-py-builder.git
synced 2025-12-17 08:38:11 +00:00
Initial commit
This commit is contained in:
294
agent_manager.py
Normal file
294
agent_manager.py
Normal file
@@ -0,0 +1,294 @@
|
||||
import argparse
|
||||
import concurrent.futures
|
||||
import os
|
||||
import socket
|
||||
import tempfile
|
||||
import time
|
||||
import zipfile
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from packaging.version import Version
|
||||
from tabulate import tabulate
|
||||
|
||||
import requests
|
||||
|
||||
from build_agent import build_agent_version
|
||||
from zeroconf_server import ZeroconfServer
|
||||
|
||||
def find_server_ips():
|
||||
ZeroconfServer.configure("_crosspybuilder._tcp.local.", socket.gethostname(), 9001)
|
||||
hostnames = []
|
||||
try:
|
||||
ZeroconfServer.start(listen_only=True)
|
||||
time.sleep(.3) # give it time to find network
|
||||
hostnames = ZeroconfServer.found_ip_addresses()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
ZeroconfServer.stop()
|
||||
|
||||
# get known hosts
|
||||
with open("known_hosts", "r") as file:
|
||||
lines = file.readlines()
|
||||
|
||||
lines = [line.split(':')[0].strip() for line in lines]
|
||||
hostnames.extend(lines)
|
||||
|
||||
return hostnames
|
||||
|
||||
def get_all_servers_status():
|
||||
table_data = []
|
||||
server_ips = find_server_ips()
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
results = executor.map(get_worker_status, server_ips) # Runs in parallel
|
||||
table_data.extend(results)
|
||||
return table_data
|
||||
|
||||
def get_worker_status(hostname):
|
||||
"""Fetch worker status from the given hostname."""
|
||||
try:
|
||||
response = requests.get(f"http://{hostname}:9001/status", timeout=5)
|
||||
status = response.json()
|
||||
if status['hostname'] != hostname and status['ip'] != hostname:
|
||||
status['ip'] = socket.gethostbyname(hostname)
|
||||
return status
|
||||
except requests.exceptions.RequestException:
|
||||
return {"hostname": hostname, "status": "offline"}
|
||||
|
||||
|
||||
def zip_project(source_dir, output_zip):
|
||||
"""Zips the given directory."""
|
||||
excluded_dirs = {"venv", ".venv", "dist", "build"}
|
||||
with zipfile.ZipFile(output_zip, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
||||
for root, dirs, files in os.walk(source_dir):
|
||||
|
||||
dirs[:] = [d for d in dirs if d.lower() not in excluded_dirs]
|
||||
for file in files:
|
||||
file_path = os.path.join(root, file)
|
||||
zipf.write(file_path, os.path.relpath(file_path, source_dir))
|
||||
|
||||
|
||||
def send_build_request(zip_file, server_ip, download_after=False):
|
||||
"""Uploads the zip file to the given server."""
|
||||
upload_url = f"http://{server_ip}:9001/upload"
|
||||
print(f"Submitting build request to URL: {upload_url}")
|
||||
print("Please wait. This may take a few minutes...")
|
||||
with open(zip_file, 'rb') as f:
|
||||
response = requests.post(upload_url, files={"file": f})
|
||||
|
||||
if response.status_code == 200:
|
||||
response_data = response.json()
|
||||
print(f"Build successful. ID: {response_data['id']} Hostname: {response_data['hostname']} OS: {response_data['os']} CPU: {response_data['cpu']} Spec files: {len(response_data['spec_files'])} - Elapsed time: {response_data['duration']}" )
|
||||
if download_after:
|
||||
download_url = f"http://{server_ip}:9001/download/{response_data.get('id')}"
|
||||
try:
|
||||
save_name = f"{os.path.splitext(os.path.basename(zip_file))[0]}-{response_data['os'].lower()}-{response_data['cpu'].lower()}.zip"
|
||||
download_zip(download_url, save_name=save_name)
|
||||
except Exception as e:
|
||||
print(f"Error downloading zip: {e}")
|
||||
else:
|
||||
print("Upload failed:", response.status_code, response.text)
|
||||
|
||||
def download_zip(url, save_name, save_dir="."):
|
||||
"""Download a ZIP file from a URL and save it with its original filename."""
|
||||
response = requests.get(url, stream=True)
|
||||
response.raise_for_status() # Raise an error if request fails
|
||||
|
||||
save_path = os.path.join(save_dir, save_name)
|
||||
with open(save_path, "wb") as file:
|
||||
for chunk in response.iter_content(chunk_size=8192): # Download in chunks
|
||||
file.write(chunk)
|
||||
|
||||
print(f"Saved binaries to file: {save_path}")
|
||||
return save_path
|
||||
|
||||
|
||||
def select_server(servers, cpu=None, os_name=None):
|
||||
"""Selects a build server based on CPU architecture and OS filters."""
|
||||
available = [s for s in servers if s["status"] == "ready"]
|
||||
|
||||
if cpu:
|
||||
available = [s for s in available if cpu.lower() in s["cpu"].lower()]
|
||||
if os_name:
|
||||
available = [s for s in available if os_name.lower() in s["os"].lower()]
|
||||
return available[0] if available else None # Return first matching server or None
|
||||
|
||||
def process_new_job(args, server_data):
|
||||
available_servers = [s for s in server_data if s["status"] == "ready"]
|
||||
|
||||
if args.cpu:
|
||||
available_servers = [s for s in available_servers if args.cpu.lower() in s["cpu"].lower()]
|
||||
if args.os:
|
||||
available_servers = [s for s in available_servers if args.os.lower() in s["os"].lower()]
|
||||
|
||||
if not available_servers:
|
||||
print("No available servers matching the criteria.")
|
||||
return
|
||||
|
||||
# Keep only unique servers
|
||||
unique_servers = {}
|
||||
for server in available_servers:
|
||||
key = (server["cpu"], server["os"])
|
||||
if key not in unique_servers:
|
||||
unique_servers[key] = server
|
||||
|
||||
available_servers = list(unique_servers.values())
|
||||
print(f"Found {len(available_servers)} servers available to build")
|
||||
print(tabulate(available_servers, headers="keys", tablefmt="grid"))
|
||||
|
||||
tmp_dir = tempfile.gettempdir()
|
||||
zip_file = os.path.join(tmp_dir, f"{os.path.basename(args.path)}.zip")
|
||||
zip_project(args.path, zip_file)
|
||||
print(f"Zipped {args.path} to {zip_file}")
|
||||
|
||||
# Start builds on all matching servers
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
print("Submitting builds to:")
|
||||
for server in available_servers:
|
||||
print(f"\t{server['hostname']} - {server['os']} - {server['cpu']}")
|
||||
futures = {executor.submit(send_build_request, zip_file, server["ip"], args.download): server for server in
|
||||
available_servers}
|
||||
|
||||
# Collect results
|
||||
for future in concurrent.futures.as_completed(futures):
|
||||
server = futures[future]
|
||||
try:
|
||||
response = future.result() # Get the response
|
||||
except Exception as e:
|
||||
print(f"Build failed on {server['hostname']}: {e}")
|
||||
|
||||
try:
|
||||
os.remove(zip_file)
|
||||
except Exception as e:
|
||||
print(f"Error removing zip file: {e}")
|
||||
|
||||
def delete_cache(server_data):
|
||||
available_servers = [s for s in server_data if s["status"] == "ready"]
|
||||
print(f"Deleting cache in from all available servers ({len(available_servers)})")
|
||||
for server in available_servers:
|
||||
try:
|
||||
response = requests.get(f"http://{server['ip']}:9001/delete_cache")
|
||||
response.raise_for_status()
|
||||
print(f"Cache cleared on {server['hostname']}")
|
||||
except Exception as e:
|
||||
print(f"Error deleting cache on {server['hostname']}: {e}")
|
||||
|
||||
|
||||
def update_worker(server):
|
||||
try:
|
||||
print(f"Updating {server['hostname']} from {server.get('agent_version')} => {build_agent_version}")
|
||||
|
||||
with open("build_agent.py", "rb") as file1, open("requirements.txt", "rb") as file2:
|
||||
update_files = {
|
||||
"file1": open("build_agent.py", "rb"),
|
||||
"file2": open("requirements.txt", "rb")
|
||||
}
|
||||
response = requests.post(f"http://{server['ip']}:9001/update", files=update_files)
|
||||
response.raise_for_status()
|
||||
|
||||
response_json = response.json()
|
||||
if response_json.get('updated_files') and not response_json.get('error_files'):
|
||||
try:
|
||||
requests.get(f"http://{server['ip']}:9001/restart", timeout=2)
|
||||
except requests.exceptions.ConnectionError:
|
||||
pass
|
||||
return server
|
||||
else:
|
||||
print(f"Error updating {server['hostname']}. Errors: {response_json.get('error_files')} - Updated: {response_json.get('updated_files')}")
|
||||
except Exception as e:
|
||||
print(f"Unhandled error updating {server['hostname']}: {e}")
|
||||
return None
|
||||
|
||||
def update_build_workers(server_data):
|
||||
available_workers = [s for s in server_data if s["status"] == "ready"]
|
||||
workers_to_update = [x for x in available_workers if Version(x.get('agent_version')) < Version(build_agent_version)]
|
||||
|
||||
if not workers_to_update:
|
||||
print(f"All {len(available_workers)} workers up to date")
|
||||
return
|
||||
|
||||
print(f"Updating workers on all available servers ({len(workers_to_update)}) to {build_agent_version}")
|
||||
updated_servers = []
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
futures = {executor.submit(update_worker, server): server for server in workers_to_update}
|
||||
for future in concurrent.futures.as_completed(futures):
|
||||
server = future.result()
|
||||
if server:
|
||||
updated_servers.append(server)
|
||||
|
||||
if updated_servers:
|
||||
print("Waiting for servers to restart...")
|
||||
unverified_servers = {server["ip"]: server for server in updated_servers}
|
||||
end_time = datetime.now() + timedelta(seconds=30)
|
||||
while unverified_servers and datetime.now() < end_time:
|
||||
for server_ip in list(unverified_servers.keys()): # Iterate over a copy to avoid modification issues
|
||||
try:
|
||||
response = requests.get(f"http://{server_ip}:9001/status", timeout=2)
|
||||
response.raise_for_status()
|
||||
server_info = unverified_servers[server_ip] # Get full server details
|
||||
agent_version = response.json().get('agent_version')
|
||||
if agent_version == build_agent_version:
|
||||
print(f"Agent on {server_info['hostname']} successfully upgraded to {build_agent_version}")
|
||||
else:
|
||||
print(f"Agent on {server_info['hostname']} failed to upgrade. Still on version {agent_version}")
|
||||
unverified_servers.pop(server_ip)
|
||||
except requests.exceptions.ConnectionError:
|
||||
pass # Server is still restarting
|
||||
if unverified_servers:
|
||||
time.sleep(1) # Short delay before retrying to avoid spamming requests
|
||||
if unverified_servers:
|
||||
print("Some servers did not restart in time:", list(unverified_servers.keys()))
|
||||
|
||||
print("Update complete")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Build agent manager for cross-py-builder")
|
||||
parser.add_argument("-status", action="store_true", help="Get status of available servers")
|
||||
parser.add_argument("-path", type=str, help="Path to the project")
|
||||
parser.add_argument("-cpu", type=str, help="CPU architecture")
|
||||
parser.add_argument("-os", type=str, help="Operating system")
|
||||
parser.add_argument("-d", '--download', action="store_true", help="Download after build")
|
||||
parser.add_argument("-delete-cache", action="store_true", help="Delete cache")
|
||||
parser.add_argument("-update-all", action="store_true", help="Update build agent")
|
||||
parser.add_argument("-restart", type=str, help="Hostname to restart")
|
||||
parser.add_argument("-restart-all", action="store_true", help="Restart all agents")
|
||||
parser.add_argument("-shutdown", type=str, help="Hostname to shutdown")
|
||||
# parser.add_argument("-worker", type=str, help="Update Agents")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.status:
|
||||
print(tabulate(get_all_servers_status(), headers="keys", tablefmt="grid"))
|
||||
return
|
||||
elif args.restart:
|
||||
restart_agent(args.restart)
|
||||
elif args.restart_all:
|
||||
print("Restarting all agents...")
|
||||
for server_ip in find_server_ips():
|
||||
restart_agent(server_ip)
|
||||
elif args.shutdown:
|
||||
print(f"Shutting down hostname: {args.shutdown}")
|
||||
try:
|
||||
requests.get(f"http://{args.shutdown}:9001/shutdown")
|
||||
except requests.exceptions.ConnectionError:
|
||||
pass
|
||||
elif args.delete_cache:
|
||||
delete_cache(get_all_servers_status())
|
||||
elif args.path:
|
||||
process_new_job(args, get_all_servers_status())
|
||||
elif args.update_all:
|
||||
update_build_workers(get_all_servers_status())
|
||||
else:
|
||||
print("No path given!")
|
||||
|
||||
|
||||
def restart_agent(hostname):
|
||||
print(f"Restarting agent: {hostname}")
|
||||
try:
|
||||
requests.get(f"http://{hostname}:9001/restart")
|
||||
except requests.exceptions.ConnectionError:
|
||||
pass
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user