From 29d24e9826b7ee0f889f8093d5358c6ee03b74ab Mon Sep 17 00:00:00 2001 From: datechnoman Date: Sun, 28 Jan 2024 11:36:17 +0000 Subject: [PATCH] Reverting --- warc_wat_url_processor.py | 90 ++++++++++++--------------------------- 1 file changed, 27 insertions(+), 63 deletions(-) diff --git a/warc_wat_url_processor.py b/warc_wat_url_processor.py index 4fbb8a0..a4fe03a 100644 --- a/warc_wat_url_processor.py +++ b/warc_wat_url_processor.py @@ -6,56 +6,43 @@ from concurrent.futures import ProcessPoolExecutor, as_completed, wait import subprocess import time -# Function to check disk space def check_disk_space(path, min_space_gb=20, check_interval=300): while True: try: - # Run the 'df' command to check disk space result = subprocess.run(['df', '-BG', path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) - - # If the command succeeds if result.returncode == 0: output = result.stdout.split('\n')[1].split() available_space_gb = int(output[3].replace('G', '')) - - # If available space is sufficient, break out of the loop if available_space_gb >= min_space_gb: break else: print(f"Waiting for more than {min_space_gb}GB free space on {path}. Current available space: {available_space_gb}GB") time.sleep(check_interval) else: - # If the command fails, wait and retry print("Error checking disk space.") time.sleep(check_interval) except Exception as e: - # If an exception occurs, wait and retry print(f"An error occurred while checking disk space: {e}") time.sleep(check_interval) -# Function to extract URLs from a gzip file def extract_urls_from_file(file_path): urls = [] try: with gzip.open(file_path, 'rt', encoding='latin-1') as file: for line in file: - # Regular expression to find URLs in a line url_pattern = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+') line_urls = re.findall(url_pattern, line) urls.extend(line_urls) except (gzip.BadGzipFile, EOFError) as e: - # Handle gzip-related errors print(f"Error while reading the compressed file '{file_path}': {e}") except Exception as e: - # Handle other unexpected errors print(f"An unexpected error occurred while processing '{file_path}': {e}") print("Full traceback:") traceback.print_exc() return urls -# Function to process a file -def process_file(file_path, error_log): +def process_file(file_path): try: print(f"Processing file: {file_path}") @@ -77,15 +64,13 @@ def process_file(file_path, error_log): try: # Run the compression command synchronously and wait for it to complete - result = subprocess.run(command_compress, shell=True, check=True, stderr=subprocess.PIPE, text=True) + result = subprocess.run(command_compress, shell=True, check=True) if result.returncode == 0: print(f"Compressed file saved as '{compressed_file_path}'") else: print(f"Compression failed for '{output_file_path}'") - error_log.write(f"Compression failed for '{output_file_path}': {result.stderr}\n") except subprocess.CalledProcessError as compression_error: print(f"Compression failed for '{output_file_path}': {compression_error}") - error_log.write(f"Compression failed for '{output_file_path}': {compression_error}\n") # Remove the original gzipped file os.remove(file_path) @@ -108,69 +93,48 @@ def process_file(file_path, error_log): print(f"Failed to remove {filename} from urls_to_download.txt") except Exception as e: - # Handle any exceptions that occur during file processing print(f"Error during processing {file_path}: {e}") traceback.print_exc() - error_log.write(f"Error during processing {file_path}: {e}\n") -# Function to download and process a file -def download_and_process_file(url, error_log_filename): +def download_and_process_file(url): try: - # Open the error log file for appending - with open(error_log_filename, 'a') as error_log: - command = f'axel -n 3 {url}' - result = subprocess.run(command, shell=True, check=True, stderr=subprocess.PIPE, text=True) - if result.returncode == 0: - file_path = os.path.join(os.getcwd(), os.path.basename(url)) - process_file(file_path, error_log) - else: - print(f"Download failed for {url}") - error_log.write(f"Download failed for {url}\n") - except Exception as e: - # Handle any exceptions that occur during download and processing - print(f"Error during download and processing {url}: {e}") - error_log.write(f"Error during download and processing {url}: {e}\n") + command = f'axel -n 3 {url}' + result = subprocess.run(command, shell=True, check=True) + if result.returncode == 0: + file_path = os.path.join(os.getcwd(), os.path.basename(url)) + process_file(file_path) + else: + print(f"Download failed for {url}") + + except Exception as e: + print(f"Error during download and processing {url}: {e}") -# Main function def main(): - # Check disk space check_disk_space('/dev/sda1') - # Read URLs from the file with open('urls_to_download.txt', 'r') as file: urls = file.readlines() urls = [url.strip() for url in urls] - # Define the concurrency level download_concurrency_level = 40 - # Define the error log filename - error_log_filename = 'error.log' + with ProcessPoolExecutor(max_workers=download_concurrency_level) as executor: + print("Submitting tasks to the ProcessPoolExecutor...") + futures = [executor.submit(download_and_process_file, url) for url in urls] + print(f"Submitted {len(futures)} tasks.") - # Open the error log file for appending - with open(error_log_filename, 'a') as error_log: - with ProcessPoolExecutor(max_workers=download_concurrency_level) as executor: - print("Submitting tasks to the ProcessPoolExecutor...") - # Submit tasks for each URL - futures = [executor.submit(download_and_process_file, url, error_log_filename) for url in urls] - print(f"Submitted {len(futures)} tasks.") + print("Waiting for tasks to complete...") + completed_futures, _ = wait(futures) + print(f"{len(completed_futures)} tasks completed.") - print("Waiting for tasks to complete...") - # Wait for all tasks to complete - completed_futures, _ = wait(futures) - print(f"{len(completed_futures)} tasks completed.") + for completed_future in completed_futures: + try: + result = completed_future.result() + print(f"Task result: {result}") + # Process the result if needed + except Exception as e: + print(f"Error in processing future: {e}") - # Process completed tasks - for completed_future in completed_futures: - try: - result = completed_future.result() - print(f"Task result: {result}") - # Process the result if needed - except Exception as e: - print(f"Error in processing future: {e}") - error_log.write(f"Error in processing future: {e}\n") - -# Entry point of the script if __name__ == "__main__": main() \ No newline at end of file