From 8ecbfc8696c481a91e5efe1b117a22c8aab3e26f Mon Sep 17 00:00:00 2001 From: datechnoman Date: Thu, 15 Feb 2024 11:29:58 +0000 Subject: [PATCH] Add archive_org_url_processor.py --- archive_org_url_processor.py | 141 +++++++++++++++++++++++++++++++++++ 1 file changed, 141 insertions(+) create mode 100644 archive_org_url_processor.py diff --git a/archive_org_url_processor.py b/archive_org_url_processor.py new file mode 100644 index 0000000..0a90eae --- /dev/null +++ b/archive_org_url_processor.py @@ -0,0 +1,141 @@ +import os +import gzip +import re +import traceback +from concurrent.futures import ProcessPoolExecutor, as_completed, wait +import subprocess +import time +import logging + +# Set up logging +logging.basicConfig(filename='processing.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +def check_disk_space(path, min_space_gb=20, check_interval=300): + while True: + try: + result = subprocess.run(['df', '-BG', path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + if result.returncode == 0: + output = result.stdout.split('\n')[1].split() + available_space_gb = int(output[3].replace('G', '')) + if available_space_gb >= min_space_gb: + break + else: + logging.info(f"Waiting for more than {min_space_gb}GB free space on {path}. Current available space: {available_space_gb}GB") + time.sleep(check_interval) + else: + logging.error("Error checking disk space.") + time.sleep(check_interval) + except Exception as e: + logging.error(f"An error occurred while checking disk space: {e}") + time.sleep(check_interval) + +# Compile the regular expression pattern outside the function +URL_PATTERN = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+') + +def extract_urls_from_file(file_path, output_file_path): + try: + with gzip.open(file_path, 'rt', encoding='latin-1') as file: + with open(output_file_path, 'w') as output_file: + for line in file: + # Process each line to extract URLs + line_urls = re.findall(URL_PATTERN, line) + for url in line_urls: + output_file.write(url + '\n') + except (gzip.BadGzipFile, EOFError) as e: + logging.error(f"Error while reading the compressed file '{file_path}': {e}") + except Exception as e: + logging.error(f"An unexpected error occurred while processing '{file_path}': {e}") + logging.error("Full traceback:") + logging.error(traceback.format_exc()) + +def process_file(file_path): + try: + logging.info(f"Processing file: {file_path}") + + # Create the output file path with '_urls.txt' extension + output_file_path = os.path.splitext(file_path)[0] + '_urls.txt' + logging.info(f"Output file path: {output_file_path}") + + # Extract URLs from the gzipped file and stream them to the output file + extract_urls_from_file(file_path, output_file_path) + + # Use zstd command-line tool for compression + compressed_file_path = f'{output_file_path}.zst' + command_compress = f'zstd -T0 -12 --long --force {output_file_path} -o {compressed_file_path}' + + try: + # Run the compression command synchronously and wait for it to complete + result = subprocess.run(command_compress, shell=True, check=True) + if result.returncode == 0: + logging.info(f"Compressed file saved as '{compressed_file_path}'") + else: + logging.error(f"Compression failed for '{output_file_path}'") + except subprocess.CalledProcessError as compression_error: + logging.error(f"Compression failed for '{output_file_path}': {compression_error}") + + # Remove the original gzipped file + os.remove(file_path) + logging.info(f"Original file removed: {file_path}") + + # Remove the original _urls.txt file + os.remove(output_file_path) + logging.info(f"Original file removed: {output_file_path}") + + # Remove the line containing the filename (without "_urls.txt") from urls_to_download.txt + filename = os.path.basename(output_file_path).replace('_urls.txt', '') + command = f'sed -i "/{filename}/d" "urls_to_download.txt"' + result = subprocess.run(command, shell=True) + if result.returncode == 0: + logging.info(f"File {filename} has been successfully removed from urls_to_download.txt") + with open('urls_to_download.txt', 'r') as file: + remaining_count = sum(1 for line in file) + logging.info(f"URLs remaining to be processed: {remaining_count}") + else: + logging.error(f"Failed to remove {filename} from urls_to_download.txt") + + except Exception as e: + logging.error(f"Error during processing {file_path}: {e}") + logging.error(traceback.format_exc()) + +def download_and_process_file(url): + try: + command = f'axel -n 3 {url}' + result = subprocess.run(command, shell=True, check=True) + if result.returncode == 0: + file_path = os.path.join(os.getcwd(), os.path.basename(url)) + process_file(file_path) + else: + logging.error(f"Download failed for {url}") + + except Exception as e: + logging.error(f"Error during download and processing {url}: {e}") + +def main(): + check_disk_space('/dev/sda1') + + with open('urls_to_download.txt', 'r') as file: + urls = file.readlines() + + urls = [url.strip() for url in urls] + + download_concurrency_level = 22 + + with ProcessPoolExecutor(max_workers=download_concurrency_level) as executor: + logging.info("Submitting tasks to the ProcessPoolExecutor...") + futures = [executor.submit(download_and_process_file, url) for url in urls] + logging.info(f"Submitted {len(futures)} tasks.") + + logging.info("Waiting for tasks to complete...") + completed_futures, _ = wait(futures) + logging.info(f"{len(completed_futures)} tasks completed.") + + for completed_future in completed_futures: + try: + result = completed_future.result() + logging.info(f"Task result: {result}") + # Process the result if needed + except Exception as e: + logging.error(f"Error in processing future: {e}") + +if __name__ == "__main__": + main() \ No newline at end of file