diff --git a/warc_wat_url_processor.py b/warc_wat_url_processor.py index 77a0cce..7b7f177 100644 --- a/warc_wat_url_processor.py +++ b/warc_wat_url_processor.py @@ -1,9 +1,9 @@ -import subprocess import os import gzip import re import traceback -from multiprocessing import Pool +from concurrent.futures import ProcessPoolExecutor, as_completed, wait +import subprocess def extract_urls_from_file(file_path): urls = [] @@ -23,69 +23,77 @@ def extract_urls_from_file(file_path): return urls def process_file(file_path): - print(f"Processing file: {file_path}") + try: + print(f"Processing file: {file_path}") - # Extract URLs from the gzipped file - urls = extract_urls_from_file(file_path) + # Extract URLs from the gzipped file + urls = extract_urls_from_file(file_path) - # Create the output file path with '_urls.txt' extension - output_file_path = os.path.splitext(file_path)[0] + '_urls.txt' - print(f"Output file path: {output_file_path}") + # Create the output file path with '_urls.txt' extension + output_file_path = os.path.splitext(file_path)[0] + '_urls.txt' + print(f"Output file path: {output_file_path}") - # Write the URLs to the output file - with open(output_file_path, 'w') as output_file: - output_file.write('\n'.join(urls)) - print(f"URLs written to {output_file_path}") + # Write the URLs to the output file + with open(output_file_path, 'w') as output_file: + output_file.write('\n'.join(urls)) + print(f"URLs written to {output_file_path}") - # Use zstd command-line tool for compression - compressed_file_path = f'{output_file_path}.zst' - command_compress = f'zstd -T0 -12 --long {output_file_path} -o {compressed_file_path}' - subprocess.run(command_compress, shell=True) - print(f"Compressed file saved as '{compressed_file_path}'") + # Use zstd command-line tool for compression + compressed_file_path = f'{output_file_path}.zst' + command_compress = f'zstd -T0 -12 --long {output_file_path} -o {compressed_file_path}' - # Remove the original gzipped file - os.remove(file_path) - print(f"Original file removed: {file_path}") + # Run the compression command synchronously and wait for it to complete + compression_process = subprocess.Popen(command_compress, shell=True) + compression_process.communicate() - # Remove the original _urls.txt file - os.remove(output_file_path) - print(f"Original file removed: {output_file_path}") + print(f"Compressed file saved as '{compressed_file_path}'") - # Remove the line containing the filename (without "_urls.txt") from urls_to_download.txt - filename = os.path.basename(output_file_path).replace('_urls.txt', '') - command = f'sed -i "/{filename}/d" "urls_to_download.txt"' - if subprocess.run(command, shell=True).returncode == 0: - print(f"File {filename} has been successfully removed from urls_to_download.txt") - with open('urls_to_download.txt', 'r') as file: - remaining_count = sum(1 for line in file) - print(f"URLs remaining to be processed: {remaining_count}") - else: - print(f"Failed to remove {filename} from urls_to_download.txt") + # Remove the original gzipped file + os.remove(file_path) + print(f"Original file removed: {file_path}") -def extract_urls_from_directory(directory_path): - file_list = sorted(os.listdir(directory_path)) - pool = Pool(processes=7) - pool.map(process_file, [os.path.join(directory_path, filename) for filename in file_list if filename.endswith('.warc.gz')]) - pool.close() - pool.join() + # Remove the original _urls.txt file + os.remove(output_file_path) + print(f"Original file removed: {output_file_path}") -with open('urls_to_download.txt', 'r') as file: - urls = file.readlines() + # Remove the line containing the filename (without "_urls.txt") from urls_to_download.txt + filename = os.path.basename(output_file_path).replace('_urls.txt', '') + command = f'sed -i "/{filename}/d" "urls_to_download.txt"' + if subprocess.run(command, shell=True).returncode == 0: + print(f"File {filename} has been successfully removed from urls_to_download.txt") + with open('urls_to_download.txt', 'r') as file: + remaining_count = sum(1 for line in file) + print(f"URLs remaining to be processed: {remaining_count}") + else: + print(f"Failed to remove {filename} from urls_to_download.txt") -urls = [url.strip() for url in urls] + except Exception as e: + print(f"Error during processing {file_path}: {e}") + traceback.print_exc() -batch_size = 48 -concurrency_level = 4 -batches = [urls[i:i+batch_size] for i in range(0, len(urls), batch_size)] - -for batch in batches: - pool = Pool(processes=concurrency_level) - - for url in batch: +def download_and_process_file(url): + try: command = f'axel -n 4 {url}' - pool.apply_async(subprocess.run, args=(command,), kwds={'shell': True}) + subprocess.run(command, shell=True) + file_path = os.path.join(os.getcwd(), os.path.basename(url)) + process_file(file_path) + except Exception as e: + print(f"Error during download and processing {url}: {e}") - pool.close() - pool.join() +def main(): + with open('urls_to_download.txt', 'r') as file: + urls = file.readlines() -extract_urls_from_directory(os.getcwd()) + urls = [url.strip() for url in urls] + + download_concurrency_level = 4 + + # Start downloading and processing files in parallel + with ProcessPoolExecutor(max_workers=download_concurrency_level) as executor: + futures = [executor.submit(download_and_process_file, url) for url in urls] + + # Wait for all downloads and processing to complete before starting the next iteration + wait(futures) + +if __name__ == "__main__": + main() \ No newline at end of file