Add warc_wat_url_processor.py

This commit is contained in:
datechnoman 2024-01-20 03:25:50 +00:00
parent b881641c69
commit 99c2f07498

102
warc_wat_url_processor.py Normal file
View File

@ -0,0 +1,102 @@
import subprocess
import os
import gzip
import zstandard as zstd
import re
import traceback # Import traceback module
from multiprocessing import Pool
def extract_urls_from_file(file_path):
urls = []
try:
with gzip.open(file_path, 'rt', encoding='latin-1') as file:
for line in file:
# Extract URLs using regular expression
url_pattern = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+')
line_urls = re.findall(url_pattern, line)
urls.extend(line_urls)
except (gzip.BadGzipFile, EOFError) as e:
print(f"Error while reading the compressed file '{file_path}': {e}")
except Exception as e:
print(f"An unexpected error occurred while processing '{file_path}': {e}")
print("Full traceback:")
traceback.print_exc() # Print the full traceback
return urls
def process_file(file_path):
print(f"Processing file: {file_path}")
# Extract URLs from the gzipped file
urls = extract_urls_from_file(file_path)
# Create the output file path with '_urls.txt' extension
output_file_path = os.path.splitext(file_path)[0] + '_urls.txt'
print(f"Output file path: {output_file_path}")
# Write the URLs to the output file
with open(output_file_path, 'w') as output_file:
output_file.write('\n'.join(urls))
print(f"URLs written to {output_file_path}")
# Compress the output file using zstd with compression level -10
with open(output_file_path, 'rb') as input_file, open(output_file_path + '.zst', 'wb') as output_zstd_file:
cctx = zstd.ZstdCompressor(level=-10)
output_zstd_file.write(cctx.compress(input_file.read()))
print(f"Compressed file saved as '{output_file_path}.zst'")
# Remove the original gzipped file
os.remove(file_path)
print(f"Original file removed: {file_path}")
def extract_urls_from_directory(directory_path):
# Get the list of files in the directory and sort them
file_list = sorted(os.listdir(directory_path))
# Create a multiprocessing Pool with the number of processes
# based on the available CPU cores
pool = Pool(processes=7)
# Map the file processing function to the list of files
pool.map(process_file, [os.path.join(directory_path, filename) for filename in file_list if filename.endswith('.warc.gz')])
# Close the pool to free up resources
pool.close()
pool.join()
# Read the URLs from the file
with open('urls_to_download.txt', 'r') as file:
urls = file.readlines()
# Remove any leading/trailing whitespace from the URLs
urls = [url.strip() for url in urls]
# Define the batch size
batch_size = 48
# Define the concurrency level (number of download processes running concurrently)
concurrency_level = 4
# Split the URLs into batches
batches = [urls[i:i+batch_size] for i in range(0, len(urls), batch_size)]
# Iterate over the batches and download URLs
for batch in batches:
# Create a multiprocessing Pool with the specified concurrency level
pool = Pool(processes=concurrency_level)
for url in batch:
# Create the command to download the URL using axel with 3 connections
command = f'axel -n 2 {url}'
# Start the subprocess in the background
pool.apply_async(subprocess.run, args=(command,), kwds={'shell': True})
# Close the pool to indicate that no more tasks will be added
pool.close()
# Wait for all processes in the pool to finish
pool.join()
# Extract URLs from the downloaded files
extract_urls_from_directory(os.getcwd())