import logging import os import shutil import tarfile import tempfile import zipfile import requests from tqdm import tqdm supported_formats = ['.zip', '.tar.xz', '.dmg'] logger = logging.getLogger() def download_and_extract_app(remote_url, download_location): # Create a temp download directory temp_download_dir = tempfile.mkdtemp() temp_downloaded_file_path = os.path.join(temp_download_dir, os.path.basename(remote_url)) try: output_dir_name = os.path.basename(remote_url) for fmt in supported_formats: output_dir_name = output_dir_name.split(fmt)[0] if os.path.exists(os.path.join(download_location, output_dir_name)): logger.error(f"Engine download for {output_dir_name} already exists") return if not os.path.exists(temp_downloaded_file_path): # Make a GET request to the URL with stream=True to enable streaming logger.info(f"Downloading {output_dir_name} from {remote_url}") response = requests.get(remote_url, stream=True) # Check if the request was successful if response.status_code == 200: # Get the total file size from the "Content-Length" header file_size = int(response.headers.get("Content-Length", 0)) # Create a progress bar using tqdm progress_bar = tqdm(total=file_size, unit="B", unit_scale=True) # Open a file for writing in binary mode with open(temp_downloaded_file_path, "wb") as file: for chunk in response.iter_content(chunk_size=1024): if chunk: # Write the chunk to the file file.write(chunk) # Update the progress bar progress_bar.update(len(chunk)) # Close the progress bar progress_bar.close() logger.info(f"Successfully downloaded {os.path.basename(temp_downloaded_file_path)}") else: logger.error(f"Failed to download the file. Status code: {response.status_code}") os.makedirs(download_location, exist_ok=True) # Extract the downloaded file # Process .tar.xz files if temp_downloaded_file_path.lower().endswith('.tar.xz'): try: with tarfile.open(temp_downloaded_file_path, 'r:xz') as tar: tar.extractall(path=download_location) logger.info( f'Successfully extracted {os.path.basename(temp_downloaded_file_path)} to {download_location}') except tarfile.TarError as e: logger.error(f'Error extracting {temp_downloaded_file_path}: {e}') except FileNotFoundError: logger.error(f'File not found: {temp_downloaded_file_path}') # Process .zip files elif temp_downloaded_file_path.lower().endswith('.zip'): try: with zipfile.ZipFile(temp_downloaded_file_path, 'r') as zip_ref: zip_ref.extractall(download_location) logger.info( f'Successfully extracted {os.path.basename(temp_downloaded_file_path)} to {download_location}') except zipfile.BadZipFile as e: logger.error(f'Error: {temp_downloaded_file_path} is not a valid ZIP file.') except FileNotFoundError: logger.error(f'File not found: {temp_downloaded_file_path}') # Process .dmg files (macOS only) elif temp_downloaded_file_path.lower().endswith('.dmg'): import dmglib dmg = dmglib.DiskImage(temp_downloaded_file_path) for mount_point in dmg.attach(): try: copy_directory_contents(mount_point, os.path.join(download_location, output_dir_name)) logger.info(f'Successfully copied {os.path.basename(temp_downloaded_file_path)} to {download_location}') except FileNotFoundError: logger.error(f'Error: The source .app bundle does not exist.') except PermissionError: logger.error(f'Error: Permission denied to copy {download_location}.') except Exception as e: logger.error(f'An error occurred: {e}') dmg.detach() else: logger.error("Unknown file. Unable to extract binary.") except Exception as e: logger.exception(e) # remove downloaded file on completion shutil.rmtree(temp_download_dir) return download_location # Function to copy directory contents but ignore symbolic links and hidden files def copy_directory_contents(src_dir, dest_dir): try: # Create the destination directory if it doesn't exist os.makedirs(dest_dir, exist_ok=True) for item in os.listdir(src_dir): item_path = os.path.join(src_dir, item) # Ignore symbolic links if os.path.islink(item_path): continue # Ignore hidden files or directories (those starting with a dot) if not item.startswith('.'): dest_item_path = os.path.join(dest_dir, item) # If it's a directory, recursively copy its contents if os.path.isdir(item_path): copy_directory_contents(item_path, dest_item_path) else: # Otherwise, copy the file shutil.copy2(item_path, dest_item_path) except Exception as e: logger.exception(f"Error copying directory contents: {e}")