From 54747b64f6665bed5fd47d6a681bc3e09cea8aa5 Mon Sep 17 00:00:00 2001 From: datechnoman Date: Sun, 28 Jan 2024 11:23:27 +0000 Subject: [PATCH] Update warc_wat_url_processor.py --- warc_wat_url_processor.py | 45 ++++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/warc_wat_url_processor.py b/warc_wat_url_processor.py index a4fe03a..1e66170 100644 --- a/warc_wat_url_processor.py +++ b/warc_wat_url_processor.py @@ -42,7 +42,7 @@ def extract_urls_from_file(file_path): return urls -def process_file(file_path): +def process_file(file_path, error_log): try: print(f"Processing file: {file_path}") @@ -64,13 +64,15 @@ def process_file(file_path): try: # Run the compression command synchronously and wait for it to complete - result = subprocess.run(command_compress, shell=True, check=True) + result = subprocess.run(command_compress, shell=True, check=True, stderr=subprocess.PIPE, text=True) if result.returncode == 0: print(f"Compressed file saved as '{compressed_file_path}'") else: print(f"Compression failed for '{output_file_path}'") + error_log.write(f"Compression failed for '{output_file_path}': {result.stderr}\n") except subprocess.CalledProcessError as compression_error: print(f"Compression failed for '{output_file_path}': {compression_error}") + error_log.write(f"Compression failed for '{output_file_path}': {compression_error}\n") # Remove the original gzipped file os.remove(file_path) @@ -95,19 +97,22 @@ def process_file(file_path): except Exception as e: print(f"Error during processing {file_path}: {e}") traceback.print_exc() + error_log.write(f"Error during processing {file_path}: {e}\n") -def download_and_process_file(url): +def download_and_process_file(url, error_log): try: command = f'axel -n 3 {url}' - result = subprocess.run(command, shell=True, check=True) + result = subprocess.run(command, shell=True, check=True, stderr=subprocess.PIPE, text=True) if result.returncode == 0: file_path = os.path.join(os.getcwd(), os.path.basename(url)) - process_file(file_path) + process_file(file_path, error_log) else: print(f"Download failed for {url}") + error_log.write(f"Download failed for {url}\n") except Exception as e: print(f"Error during download and processing {url}: {e}") + error_log.write(f"Error during download and processing {url}: {e}\n") def main(): check_disk_space('/dev/sda1') @@ -119,22 +124,24 @@ def main(): download_concurrency_level = 40 - with ProcessPoolExecutor(max_workers=download_concurrency_level) as executor: - print("Submitting tasks to the ProcessPoolExecutor...") - futures = [executor.submit(download_and_process_file, url) for url in urls] - print(f"Submitted {len(futures)} tasks.") + with open('error.log', 'a') as error_log: + with ProcessPoolExecutor(max_workers=download_concurrency_level) as executor: + print("Submitting tasks to the ProcessPoolExecutor...") + futures = [executor.submit(download_and_process_file, url, error_log) for url in urls] + print(f"Submitted {len(futures)} tasks.") - print("Waiting for tasks to complete...") - completed_futures, _ = wait(futures) - print(f"{len(completed_futures)} tasks completed.") + print("Waiting for tasks to complete...") + completed_futures, _ = wait(futures) + print(f"{len(completed_futures)} tasks completed.") - for completed_future in completed_futures: - try: - result = completed_future.result() - print(f"Task result: {result}") - # Process the result if needed - except Exception as e: - print(f"Error in processing future: {e}") + for completed_future in completed_futures: + try: + result = completed_future.result() + print(f"Task result: {result}") + # Process the result if needed + except Exception as e: + print(f"Error in processing future: {e}") + error_log.write(f"Error in processing future: {e}\n") if __name__ == "__main__": main() \ No newline at end of file