CommonCrawl_URL_Processor/warc_wat_url_processor.py

106 lines
4.0 KiB
Python
Raw Normal View History

2024-01-20 03:25:50 +00:00
import os
import gzip
import re
2024-01-20 11:25:59 +00:00
import traceback
2024-01-23 04:43:15 +00:00
from concurrent.futures import ProcessPoolExecutor, as_completed, wait
import subprocess
2024-01-20 03:25:50 +00:00
def extract_urls_from_file(file_path):
urls = []
try:
with gzip.open(file_path, 'rt', encoding='latin-1') as file:
for line in file:
url_pattern = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+')
line_urls = re.findall(url_pattern, line)
urls.extend(line_urls)
except (gzip.BadGzipFile, EOFError) as e:
print(f"Error while reading the compressed file '{file_path}': {e}")
except Exception as e:
print(f"An unexpected error occurred while processing '{file_path}': {e}")
print("Full traceback:")
2024-01-20 11:25:59 +00:00
traceback.print_exc()
2024-01-20 11:29:57 +00:00
2024-01-20 03:25:50 +00:00
return urls
def process_file(file_path):
2024-01-23 04:43:15 +00:00
try:
print(f"Processing file: {file_path}")
# Extract URLs from the gzipped file
urls = extract_urls_from_file(file_path)
# Create the output file path with '_urls.txt' extension
output_file_path = os.path.splitext(file_path)[0] + '_urls.txt'
print(f"Output file path: {output_file_path}")
# Write the URLs to the output file
with open(output_file_path, 'w') as output_file:
output_file.write('\n'.join(urls))
print(f"URLs written to {output_file_path}")
# Use zstd command-line tool for compression
compressed_file_path = f'{output_file_path}.zst'
command_compress = f'zstd -T0 -12 --long {output_file_path} -o {compressed_file_path}'
# Run the compression command synchronously and wait for it to complete
2024-01-23 10:45:32 +00:00
result = subprocess.run(command_compress, shell=True, check=True)
if result.returncode == 0:
print(f"Compressed file saved as '{compressed_file_path}'")
else:
print(f"Compression failed for '{output_file_path}'")
2024-01-23 04:43:15 +00:00
# Remove the original gzipped file
os.remove(file_path)
print(f"Original file removed: {file_path}")
# Remove the original _urls.txt file
os.remove(output_file_path)
print(f"Original file removed: {output_file_path}")
# Remove the line containing the filename (without "_urls.txt") from urls_to_download.txt
filename = os.path.basename(output_file_path).replace('_urls.txt', '')
command = f'sed -i "/{filename}/d" "urls_to_download.txt"'
2024-01-23 10:45:32 +00:00
result = subprocess.run(command, shell=True)
if result.returncode == 0:
2024-01-23 04:43:15 +00:00
print(f"File {filename} has been successfully removed from urls_to_download.txt")
with open('urls_to_download.txt', 'r') as file:
remaining_count = sum(1 for line in file)
print(f"URLs remaining to be processed: {remaining_count}")
else:
print(f"Failed to remove {filename} from urls_to_download.txt")
except Exception as e:
print(f"Error during processing {file_path}: {e}")
traceback.print_exc()
def download_and_process_file(url):
try:
2024-01-23 10:45:32 +00:00
command = f'axel -n 3 {url}'
result = subprocess.run(command, shell=True, check=True)
if result.returncode == 0:
file_path = os.path.join(os.getcwd(), os.path.basename(url))
process_file(file_path)
else:
print(f"Download failed for {url}")
2024-01-23 04:43:15 +00:00
except Exception as e:
print(f"Error during download and processing {url}: {e}")
def main():
with open('urls_to_download.txt', 'r') as file:
urls = file.readlines()
urls = [url.strip() for url in urls]
download_concurrency_level = 4
# Start downloading and processing files in parallel
with ProcessPoolExecutor(max_workers=download_concurrency_level) as executor:
futures = [executor.submit(download_and_process_file, url) for url in urls]
2024-01-20 03:25:50 +00:00
2024-01-23 04:43:15 +00:00
# Wait for all downloads and processing to complete before starting the next iteration
2024-01-23 10:45:32 +00:00
as_completed(futures)
2024-01-20 03:25:50 +00:00
2024-01-23 04:43:15 +00:00
if __name__ == "__main__":
2024-01-23 10:45:32 +00:00
main()