diff --git a/warc_wat_url_processor.py b/warc_wat_url_processor.py new file mode 100644 index 0000000..68215cf --- /dev/null +++ b/warc_wat_url_processor.py @@ -0,0 +1,102 @@ +import subprocess +import os +import gzip +import zstandard as zstd +import re +import traceback # Import traceback module +from multiprocessing import Pool + +def extract_urls_from_file(file_path): + urls = [] + try: + with gzip.open(file_path, 'rt', encoding='latin-1') as file: + for line in file: + # Extract URLs using regular expression + url_pattern = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+') + line_urls = re.findall(url_pattern, line) + urls.extend(line_urls) + except (gzip.BadGzipFile, EOFError) as e: + print(f"Error while reading the compressed file '{file_path}': {e}") + except Exception as e: + print(f"An unexpected error occurred while processing '{file_path}': {e}") + print("Full traceback:") + traceback.print_exc() # Print the full traceback + + return urls + +def process_file(file_path): + print(f"Processing file: {file_path}") + + # Extract URLs from the gzipped file + urls = extract_urls_from_file(file_path) + + # Create the output file path with '_urls.txt' extension + output_file_path = os.path.splitext(file_path)[0] + '_urls.txt' + print(f"Output file path: {output_file_path}") + + # Write the URLs to the output file + with open(output_file_path, 'w') as output_file: + output_file.write('\n'.join(urls)) + print(f"URLs written to {output_file_path}") + + # Compress the output file using zstd with compression level -10 + with open(output_file_path, 'rb') as input_file, open(output_file_path + '.zst', 'wb') as output_zstd_file: + cctx = zstd.ZstdCompressor(level=-10) + output_zstd_file.write(cctx.compress(input_file.read())) + print(f"Compressed file saved as '{output_file_path}.zst'") + + # Remove the original gzipped file + os.remove(file_path) + print(f"Original file removed: {file_path}") + +def extract_urls_from_directory(directory_path): + # Get the list of files in the directory and sort them + file_list = sorted(os.listdir(directory_path)) + + # Create a multiprocessing Pool with the number of processes + # based on the available CPU cores + pool = Pool(processes=7) + + # Map the file processing function to the list of files + pool.map(process_file, [os.path.join(directory_path, filename) for filename in file_list if filename.endswith('.warc.gz')]) + + # Close the pool to free up resources + pool.close() + pool.join() + +# Read the URLs from the file +with open('urls_to_download.txt', 'r') as file: + urls = file.readlines() + +# Remove any leading/trailing whitespace from the URLs +urls = [url.strip() for url in urls] + +# Define the batch size +batch_size = 48 + +# Define the concurrency level (number of download processes running concurrently) +concurrency_level = 4 + +# Split the URLs into batches +batches = [urls[i:i+batch_size] for i in range(0, len(urls), batch_size)] + +# Iterate over the batches and download URLs +for batch in batches: + # Create a multiprocessing Pool with the specified concurrency level + pool = Pool(processes=concurrency_level) + + for url in batch: + # Create the command to download the URL using axel with 3 connections + command = f'axel -n 2 {url}' + + # Start the subprocess in the background + pool.apply_async(subprocess.run, args=(command,), kwds={'shell': True}) + + # Close the pool to indicate that no more tasks will be added + pool.close() + + # Wait for all processes in the pool to finish + pool.join() + +# Extract URLs from the downloaded files +extract_urls_from_directory(os.getcwd())