Update warc_wat_url_processor.py
This commit is contained in:
parent
b6a9c68140
commit
54747b64f6
@ -42,7 +42,7 @@ def extract_urls_from_file(file_path):
|
|||||||
|
|
||||||
return urls
|
return urls
|
||||||
|
|
||||||
def process_file(file_path):
|
def process_file(file_path, error_log):
|
||||||
try:
|
try:
|
||||||
print(f"Processing file: {file_path}")
|
print(f"Processing file: {file_path}")
|
||||||
|
|
||||||
@ -64,13 +64,15 @@ def process_file(file_path):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
# Run the compression command synchronously and wait for it to complete
|
# Run the compression command synchronously and wait for it to complete
|
||||||
result = subprocess.run(command_compress, shell=True, check=True)
|
result = subprocess.run(command_compress, shell=True, check=True, stderr=subprocess.PIPE, text=True)
|
||||||
if result.returncode == 0:
|
if result.returncode == 0:
|
||||||
print(f"Compressed file saved as '{compressed_file_path}'")
|
print(f"Compressed file saved as '{compressed_file_path}'")
|
||||||
else:
|
else:
|
||||||
print(f"Compression failed for '{output_file_path}'")
|
print(f"Compression failed for '{output_file_path}'")
|
||||||
|
error_log.write(f"Compression failed for '{output_file_path}': {result.stderr}\n")
|
||||||
except subprocess.CalledProcessError as compression_error:
|
except subprocess.CalledProcessError as compression_error:
|
||||||
print(f"Compression failed for '{output_file_path}': {compression_error}")
|
print(f"Compression failed for '{output_file_path}': {compression_error}")
|
||||||
|
error_log.write(f"Compression failed for '{output_file_path}': {compression_error}\n")
|
||||||
|
|
||||||
# Remove the original gzipped file
|
# Remove the original gzipped file
|
||||||
os.remove(file_path)
|
os.remove(file_path)
|
||||||
@ -95,19 +97,22 @@ def process_file(file_path):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error during processing {file_path}: {e}")
|
print(f"Error during processing {file_path}: {e}")
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
|
error_log.write(f"Error during processing {file_path}: {e}\n")
|
||||||
|
|
||||||
def download_and_process_file(url):
|
def download_and_process_file(url, error_log):
|
||||||
try:
|
try:
|
||||||
command = f'axel -n 3 {url}'
|
command = f'axel -n 3 {url}'
|
||||||
result = subprocess.run(command, shell=True, check=True)
|
result = subprocess.run(command, shell=True, check=True, stderr=subprocess.PIPE, text=True)
|
||||||
if result.returncode == 0:
|
if result.returncode == 0:
|
||||||
file_path = os.path.join(os.getcwd(), os.path.basename(url))
|
file_path = os.path.join(os.getcwd(), os.path.basename(url))
|
||||||
process_file(file_path)
|
process_file(file_path, error_log)
|
||||||
else:
|
else:
|
||||||
print(f"Download failed for {url}")
|
print(f"Download failed for {url}")
|
||||||
|
error_log.write(f"Download failed for {url}\n")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error during download and processing {url}: {e}")
|
print(f"Error during download and processing {url}: {e}")
|
||||||
|
error_log.write(f"Error during download and processing {url}: {e}\n")
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
check_disk_space('/dev/sda1')
|
check_disk_space('/dev/sda1')
|
||||||
@ -119,22 +124,24 @@ def main():
|
|||||||
|
|
||||||
download_concurrency_level = 40
|
download_concurrency_level = 40
|
||||||
|
|
||||||
with ProcessPoolExecutor(max_workers=download_concurrency_level) as executor:
|
with open('error.log', 'a') as error_log:
|
||||||
print("Submitting tasks to the ProcessPoolExecutor...")
|
with ProcessPoolExecutor(max_workers=download_concurrency_level) as executor:
|
||||||
futures = [executor.submit(download_and_process_file, url) for url in urls]
|
print("Submitting tasks to the ProcessPoolExecutor...")
|
||||||
print(f"Submitted {len(futures)} tasks.")
|
futures = [executor.submit(download_and_process_file, url, error_log) for url in urls]
|
||||||
|
print(f"Submitted {len(futures)} tasks.")
|
||||||
|
|
||||||
print("Waiting for tasks to complete...")
|
print("Waiting for tasks to complete...")
|
||||||
completed_futures, _ = wait(futures)
|
completed_futures, _ = wait(futures)
|
||||||
print(f"{len(completed_futures)} tasks completed.")
|
print(f"{len(completed_futures)} tasks completed.")
|
||||||
|
|
||||||
for completed_future in completed_futures:
|
for completed_future in completed_futures:
|
||||||
try:
|
try:
|
||||||
result = completed_future.result()
|
result = completed_future.result()
|
||||||
print(f"Task result: {result}")
|
print(f"Task result: {result}")
|
||||||
# Process the result if needed
|
# Process the result if needed
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error in processing future: {e}")
|
print(f"Error in processing future: {e}")
|
||||||
|
error_log.write(f"Error in processing future: {e}\n")
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
Loading…
Reference in New Issue
Block a user