Add archive_org_url_processor.py
This commit is contained in:
parent
722838b24a
commit
8ecbfc8696
141
archive_org_url_processor.py
Normal file
141
archive_org_url_processor.py
Normal file
@ -0,0 +1,141 @@
|
|||||||
|
import os
|
||||||
|
import gzip
|
||||||
|
import re
|
||||||
|
import traceback
|
||||||
|
from concurrent.futures import ProcessPoolExecutor, as_completed, wait
|
||||||
|
import subprocess
|
||||||
|
import time
|
||||||
|
import logging
|
||||||
|
|
||||||
|
# Set up logging
|
||||||
|
logging.basicConfig(filename='processing.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||||
|
|
||||||
|
def check_disk_space(path, min_space_gb=20, check_interval=300):
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
result = subprocess.run(['df', '-BG', path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
||||||
|
if result.returncode == 0:
|
||||||
|
output = result.stdout.split('\n')[1].split()
|
||||||
|
available_space_gb = int(output[3].replace('G', ''))
|
||||||
|
if available_space_gb >= min_space_gb:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
logging.info(f"Waiting for more than {min_space_gb}GB free space on {path}. Current available space: {available_space_gb}GB")
|
||||||
|
time.sleep(check_interval)
|
||||||
|
else:
|
||||||
|
logging.error("Error checking disk space.")
|
||||||
|
time.sleep(check_interval)
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"An error occurred while checking disk space: {e}")
|
||||||
|
time.sleep(check_interval)
|
||||||
|
|
||||||
|
# Compile the regular expression pattern outside the function
|
||||||
|
URL_PATTERN = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+')
|
||||||
|
|
||||||
|
def extract_urls_from_file(file_path, output_file_path):
|
||||||
|
try:
|
||||||
|
with gzip.open(file_path, 'rt', encoding='latin-1') as file:
|
||||||
|
with open(output_file_path, 'w') as output_file:
|
||||||
|
for line in file:
|
||||||
|
# Process each line to extract URLs
|
||||||
|
line_urls = re.findall(URL_PATTERN, line)
|
||||||
|
for url in line_urls:
|
||||||
|
output_file.write(url + '\n')
|
||||||
|
except (gzip.BadGzipFile, EOFError) as e:
|
||||||
|
logging.error(f"Error while reading the compressed file '{file_path}': {e}")
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"An unexpected error occurred while processing '{file_path}': {e}")
|
||||||
|
logging.error("Full traceback:")
|
||||||
|
logging.error(traceback.format_exc())
|
||||||
|
|
||||||
|
def process_file(file_path):
|
||||||
|
try:
|
||||||
|
logging.info(f"Processing file: {file_path}")
|
||||||
|
|
||||||
|
# Create the output file path with '_urls.txt' extension
|
||||||
|
output_file_path = os.path.splitext(file_path)[0] + '_urls.txt'
|
||||||
|
logging.info(f"Output file path: {output_file_path}")
|
||||||
|
|
||||||
|
# Extract URLs from the gzipped file and stream them to the output file
|
||||||
|
extract_urls_from_file(file_path, output_file_path)
|
||||||
|
|
||||||
|
# Use zstd command-line tool for compression
|
||||||
|
compressed_file_path = f'{output_file_path}.zst'
|
||||||
|
command_compress = f'zstd -T0 -12 --long --force {output_file_path} -o {compressed_file_path}'
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Run the compression command synchronously and wait for it to complete
|
||||||
|
result = subprocess.run(command_compress, shell=True, check=True)
|
||||||
|
if result.returncode == 0:
|
||||||
|
logging.info(f"Compressed file saved as '{compressed_file_path}'")
|
||||||
|
else:
|
||||||
|
logging.error(f"Compression failed for '{output_file_path}'")
|
||||||
|
except subprocess.CalledProcessError as compression_error:
|
||||||
|
logging.error(f"Compression failed for '{output_file_path}': {compression_error}")
|
||||||
|
|
||||||
|
# Remove the original gzipped file
|
||||||
|
os.remove(file_path)
|
||||||
|
logging.info(f"Original file removed: {file_path}")
|
||||||
|
|
||||||
|
# Remove the original _urls.txt file
|
||||||
|
os.remove(output_file_path)
|
||||||
|
logging.info(f"Original file removed: {output_file_path}")
|
||||||
|
|
||||||
|
# Remove the line containing the filename (without "_urls.txt") from urls_to_download.txt
|
||||||
|
filename = os.path.basename(output_file_path).replace('_urls.txt', '')
|
||||||
|
command = f'sed -i "/{filename}/d" "urls_to_download.txt"'
|
||||||
|
result = subprocess.run(command, shell=True)
|
||||||
|
if result.returncode == 0:
|
||||||
|
logging.info(f"File {filename} has been successfully removed from urls_to_download.txt")
|
||||||
|
with open('urls_to_download.txt', 'r') as file:
|
||||||
|
remaining_count = sum(1 for line in file)
|
||||||
|
logging.info(f"URLs remaining to be processed: {remaining_count}")
|
||||||
|
else:
|
||||||
|
logging.error(f"Failed to remove {filename} from urls_to_download.txt")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Error during processing {file_path}: {e}")
|
||||||
|
logging.error(traceback.format_exc())
|
||||||
|
|
||||||
|
def download_and_process_file(url):
|
||||||
|
try:
|
||||||
|
command = f'axel -n 3 {url}'
|
||||||
|
result = subprocess.run(command, shell=True, check=True)
|
||||||
|
if result.returncode == 0:
|
||||||
|
file_path = os.path.join(os.getcwd(), os.path.basename(url))
|
||||||
|
process_file(file_path)
|
||||||
|
else:
|
||||||
|
logging.error(f"Download failed for {url}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Error during download and processing {url}: {e}")
|
||||||
|
|
||||||
|
def main():
|
||||||
|
check_disk_space('/dev/sda1')
|
||||||
|
|
||||||
|
with open('urls_to_download.txt', 'r') as file:
|
||||||
|
urls = file.readlines()
|
||||||
|
|
||||||
|
urls = [url.strip() for url in urls]
|
||||||
|
|
||||||
|
download_concurrency_level = 22
|
||||||
|
|
||||||
|
with ProcessPoolExecutor(max_workers=download_concurrency_level) as executor:
|
||||||
|
logging.info("Submitting tasks to the ProcessPoolExecutor...")
|
||||||
|
futures = [executor.submit(download_and_process_file, url) for url in urls]
|
||||||
|
logging.info(f"Submitted {len(futures)} tasks.")
|
||||||
|
|
||||||
|
logging.info("Waiting for tasks to complete...")
|
||||||
|
completed_futures, _ = wait(futures)
|
||||||
|
logging.info(f"{len(completed_futures)} tasks completed.")
|
||||||
|
|
||||||
|
for completed_future in completed_futures:
|
||||||
|
try:
|
||||||
|
result = completed_future.result()
|
||||||
|
logging.info(f"Task result: {result}")
|
||||||
|
# Process the result if needed
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Error in processing future: {e}")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
Loading…
Reference in New Issue
Block a user