import subprocess import os from concurrent.futures import ThreadPoolExecutor from threading import Lock def process_file(file, directory_path, output_file_path, keyword, counter_lock, processed_counter): file_path = os.path.join(directory_path, file) print(f"\nProcessing {file_path}...") # Determine the appropriate command based on file extension if file.endswith(".gz"): command = f"zcat {file_path}" elif file.endswith(".zst"): command = f"zstdcat {file_path}" elif file.endswith(".txt"): command = f"cat {file_path}" else: print(f"Skipping {file_path}. Unsupported file extension.") return # Load the entire file content into memory try: file_content = subprocess.check_output(command, shell=True, text=True) except subprocess.CalledProcessError as e: print(f"Error processing {file_path}: {e}") return # Stream the output line by line and append to the output file with open(output_file_path, "a") as output_file: for line in file_content.splitlines(): if keyword in line: output_file.write(line + '\n') # Update the processed files count with counter_lock: processed_counter[0] += 1 remaining_count = len(files) - processed_counter[0] print(f"{file_path} processed. URLs appended to {output_file_path}. Remaining files: {remaining_count}") # Ask the user for the directory containing files directory_path = input("Enter the directory path containing files: ") # Ensure the directory exists if not os.path.exists(directory_path): print(f"Error: The directory '{directory_path}' does not exist.") exit() # List all files in the directory files = os.listdir(directory_path) # Check if there are any files in the directory if not files: print("Error: No files found in the specified directory.") exit() # Ask the user for the output directory output_directory = input("Enter the output directory path: ") # Ensure the output directory exists; if not, create it if not os.path.exists(output_directory): os.makedirs(output_directory) # Ask the user for the keyword keyword = input("Enter the keyword to search for: ") # Generate the output file name based on the directory name and keyword directory_name = os.path.basename(directory_path) output_file_name = f"{directory_name}_{keyword}_output.txt" output_file_path = os.path.join(output_directory, output_file_name) # Ask the user for the number of concurrent instances num_concurrent_instances = int(input("Enter the number of concurrent instances: ")) # Use ThreadPoolExecutor to run the specified number of concurrent instances counter_lock = Lock() processed_counter = [0] # Using a list to store an integer (mutable) to pass by reference with ThreadPoolExecutor(max_workers=num_concurrent_instances) as executor: # Submit each file for processing futures = [executor.submit(process_file, file, directory_path, output_file_path, keyword, counter_lock, processed_counter) for file in files] # Wait for all tasks to complete for future in futures: future.result() print(f"\nAll files processed. Matching lines appended to {output_file_path}")