Update warc_wat_url_processor.py

This commit is contained in:
datechnoman 2024-01-23 04:43:15 +00:00
parent 513b32e80a
commit 6edffba451

View File

@ -1,9 +1,9 @@
import subprocess
import os import os
import gzip import gzip
import re import re
import traceback import traceback
from multiprocessing import Pool from concurrent.futures import ProcessPoolExecutor, as_completed, wait
import subprocess
def extract_urls_from_file(file_path): def extract_urls_from_file(file_path):
urls = [] urls = []
@ -23,6 +23,7 @@ def extract_urls_from_file(file_path):
return urls return urls
def process_file(file_path): def process_file(file_path):
try:
print(f"Processing file: {file_path}") print(f"Processing file: {file_path}")
# Extract URLs from the gzipped file # Extract URLs from the gzipped file
@ -40,7 +41,11 @@ def process_file(file_path):
# Use zstd command-line tool for compression # Use zstd command-line tool for compression
compressed_file_path = f'{output_file_path}.zst' compressed_file_path = f'{output_file_path}.zst'
command_compress = f'zstd -T0 -12 --long {output_file_path} -o {compressed_file_path}' command_compress = f'zstd -T0 -12 --long {output_file_path} -o {compressed_file_path}'
subprocess.run(command_compress, shell=True)
# Run the compression command synchronously and wait for it to complete
compression_process = subprocess.Popen(command_compress, shell=True)
compression_process.communicate()
print(f"Compressed file saved as '{compressed_file_path}'") print(f"Compressed file saved as '{compressed_file_path}'")
# Remove the original gzipped file # Remove the original gzipped file
@ -62,30 +67,33 @@ def process_file(file_path):
else: else:
print(f"Failed to remove {filename} from urls_to_download.txt") print(f"Failed to remove {filename} from urls_to_download.txt")
def extract_urls_from_directory(directory_path): except Exception as e:
file_list = sorted(os.listdir(directory_path)) print(f"Error during processing {file_path}: {e}")
pool = Pool(processes=7) traceback.print_exc()
pool.map(process_file, [os.path.join(directory_path, filename) for filename in file_list if filename.endswith('.warc.gz')])
pool.close()
pool.join()
def download_and_process_file(url):
try:
command = f'axel -n 4 {url}'
subprocess.run(command, shell=True)
file_path = os.path.join(os.getcwd(), os.path.basename(url))
process_file(file_path)
except Exception as e:
print(f"Error during download and processing {url}: {e}")
def main():
with open('urls_to_download.txt', 'r') as file: with open('urls_to_download.txt', 'r') as file:
urls = file.readlines() urls = file.readlines()
urls = [url.strip() for url in urls] urls = [url.strip() for url in urls]
batch_size = 48 download_concurrency_level = 4
concurrency_level = 4
batches = [urls[i:i+batch_size] for i in range(0, len(urls), batch_size)]
for batch in batches: # Start downloading and processing files in parallel
pool = Pool(processes=concurrency_level) with ProcessPoolExecutor(max_workers=download_concurrency_level) as executor:
futures = [executor.submit(download_and_process_file, url) for url in urls]
for url in batch: # Wait for all downloads and processing to complete before starting the next iteration
command = f'axel -n 4 {url}' wait(futures)
pool.apply_async(subprocess.run, args=(command,), kwds={'shell': True})
pool.close() if __name__ == "__main__":
pool.join() main()
extract_urls_from_directory(os.getcwd())