import os import gzip import re import traceback from concurrent.futures import ProcessPoolExecutor, as_completed, wait import subprocess import time import logging # Set up logging logging.basicConfig(filename='processing.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') def check_disk_space(path, min_space_gb=20, check_interval=300): while True: try: result = subprocess.run(['df', '-BG', path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if result.returncode == 0: output = result.stdout.split('\n')[1].split() available_space_gb = int(output[3].replace('G', '')) if available_space_gb >= min_space_gb: break else: logging.info(f"Waiting for more than {min_space_gb}GB free space on {path}. Current available space: {available_space_gb}GB") time.sleep(check_interval) else: logging.error("Error checking disk space.") time.sleep(check_interval) except Exception as e: logging.error(f"An error occurred while checking disk space: {e}") time.sleep(check_interval) # Compile the regular expression pattern outside the function URL_PATTERN = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+') def extract_urls_from_file(file_path, output_file_path): try: with gzip.open(file_path, 'rt', encoding='latin-1') as file: with open(output_file_path, 'w') as output_file: for line in file: # Process each line to extract URLs line_urls = re.findall(URL_PATTERN, line) for url in line_urls: output_file.write(url + '\n') except (gzip.BadGzipFile, EOFError) as e: logging.error(f"Error while reading the compressed file '{file_path}': {e}") except Exception as e: logging.error(f"An unexpected error occurred while processing '{file_path}': {e}") logging.error("Full traceback:") logging.error(traceback.format_exc()) def process_file(file_path): try: logging.info(f"Processing file: {file_path}") # Create the output file path with '_urls.txt' extension output_file_path = os.path.splitext(file_path)[0] + '_urls.txt' logging.info(f"Output file path: {output_file_path}") # Extract URLs from the gzipped file and stream them to the output file extract_urls_from_file(file_path, output_file_path) # Use zstd command-line tool for compression compressed_file_path = f'{output_file_path}.zst' command_compress = f'zstd -T0 -12 --long --force {output_file_path} -o {compressed_file_path}' try: # Run the compression command synchronously and wait for it to complete result = subprocess.run(command_compress, shell=True, check=True) if result.returncode == 0: logging.info(f"Compressed file saved as '{compressed_file_path}'") else: logging.error(f"Compression failed for '{output_file_path}'") except subprocess.CalledProcessError as compression_error: logging.error(f"Compression failed for '{output_file_path}': {compression_error}") # Remove the original gzipped file os.remove(file_path) logging.info(f"Original file removed: {file_path}") # Remove the original _urls.txt file os.remove(output_file_path) logging.info(f"Original file removed: {output_file_path}") # Remove the line containing the filename (without "_urls.txt") from urls_to_download.txt filename = os.path.basename(output_file_path).replace('_urls.txt', '') command = f'sed -i "/{filename}/d" "urls_to_download.txt"' result = subprocess.run(command, shell=True) if result.returncode == 0: logging.info(f"File {filename} has been successfully removed from urls_to_download.txt") with open('urls_to_download.txt', 'r') as file: remaining_count = sum(1 for line in file) logging.info(f"URLs remaining to be processed: {remaining_count}") else: logging.error(f"Failed to remove {filename} from urls_to_download.txt") except Exception as e: logging.error(f"Error during processing {file_path}: {e}") logging.error(traceback.format_exc()) def download_and_process_file(url): try: command = f'axel -n 3 {url}' result = subprocess.run(command, shell=True, check=True) if result.returncode == 0: file_path = os.path.join(os.getcwd(), os.path.basename(url)) process_file(file_path) else: logging.error(f"Download failed for {url}") except Exception as e: logging.error(f"Error during download and processing {url}: {e}") def main(): check_disk_space('/dev/sda1') with open('urls_to_download.txt', 'r') as file: urls = file.readlines() urls = [url.strip() for url in urls] download_concurrency_level = 22 with ProcessPoolExecutor(max_workers=download_concurrency_level) as executor: logging.info("Submitting tasks to the ProcessPoolExecutor...") futures = [executor.submit(download_and_process_file, url) for url in urls] logging.info(f"Submitted {len(futures)} tasks.") logging.info("Waiting for tasks to complete...") completed_futures, _ = wait(futures) logging.info(f"{len(completed_futures)} tasks completed.") for completed_future in completed_futures: try: result = completed_future.result() logging.info(f"Task result: {result}") # Process the result if needed except Exception as e: logging.error(f"Error in processing future: {e}") if __name__ == "__main__": main()