function main_v1
Main execution function that processes and copies document files from an output directory to target folders based on document codes, with support for dry-run and test modes.
/tf/active/vicechatdev/mailsearch/copy_signed_documents.py
119 - 253
complex
Purpose
This function orchestrates a document file copying workflow by: (1) loading document metadata from a CSV file and email dates from a register, (2) filtering documents that need copying based on status and WUXI coding in filenames, (3) finding appropriate target folders for each document, (4) generating new filenames, (5) copying files while preserving email dates as file modification times, and (6) providing detailed progress reporting and error handling. It supports dry-run mode for validation and test mode for processing a limited subset of documents.
Source Code
def main(dry_run=True, test_mode=False, test_count=3):
"""
Main execution function
Args:
dry_run: If True, only print what would be done without copying
test_mode: If True, only process first few documents
test_count: Number of documents to process in test mode
"""
print("="*80)
if dry_run:
print("DRY RUN MODE - No files will be copied")
elif test_mode:
print(f"TEST MODE - Will copy {test_count} documents")
else:
print("FULL RUN MODE - Will copy all documents")
print("="*80)
# Load data
print("\nLoading data...")
with open(COMPARISON_CSV, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
results = list(reader)
email_dates = load_email_dates()
print(f"Loaded {len(email_dates)} file dates from download register")
# Filter documents to copy (only those with wuxi coding in filename)
all_candidates = [r for r in results if r['status'] in ['PRESENT BUT UNSIGNED', 'ABSENT']]
docs_to_copy = [r for r in all_candidates if has_wuxi_coding(r['output_filename'])]
print(f"\nFound {len(all_candidates)} total documents needing copy")
print(f"Filtered to {len(docs_to_copy)} documents with WUXI coding in filename:")
print(f" - PRESENT BUT UNSIGNED: {sum(1 for r in docs_to_copy if r['status'] == 'PRESENT BUT UNSIGNED')}")
print(f" - ABSENT: {sum(1 for r in docs_to_copy if r['status'] == 'ABSENT')}")
print(f" - Excluded (no WUXI coding): {len(all_candidates) - len(docs_to_copy)}")
if test_mode:
docs_to_copy = docs_to_copy[:test_count]
print(f"\nTest mode: Processing first {test_count} documents")
# Process documents
print("\n" + "="*80)
print("Processing documents:")
print("="*80)
success_count = 0
error_count = 0
errors = []
for i, doc in enumerate(docs_to_copy, 1):
code = doc['document_code']
filename = doc['output_filename']
status = doc['status']
print(f"\n{i}/{len(docs_to_copy)}. {code} - {filename[:60]}")
print(f" Status: {status}")
try:
# Find best folder
best_folder, match_score = find_best_folder(code)
if not best_folder:
print(f" ✗ ERROR: No matching folder found")
error_count += 1
errors.append((code, "No matching folder found"))
continue
rel_folder = os.path.relpath(best_folder, WUXI2_ROOT)
print(f" Target folder: {rel_folder}")
print(f" Match score: {match_score} parts")
# Generate new filename
new_filename = create_new_filename(filename)
print(f" New filename: {new_filename}")
# Get source and destination paths
src_path = os.path.join(OUTPUT_DIR, filename)
dst_path = os.path.join(best_folder, new_filename)
if not os.path.exists(src_path):
print(f" ✗ ERROR: Source file not found")
error_count += 1
errors.append((code, "Source file not found"))
continue
# Check if destination already exists
if os.path.exists(dst_path):
print(f" ⚠ WARNING: Destination file already exists")
if not dry_run:
# Skip if file exists
print(f" → Skipping (file exists)")
continue
# Get email date
email_date = email_dates.get(filename)
if email_date:
print(f" Email date: {email_date.strftime('%Y-%m-%d')}")
else:
print(f" ⚠ Email date not found, using current date")
email_date = datetime.now()
# Copy file
if dry_run:
print(f" → Would copy to: {os.path.relpath(dst_path, '/tf/active')}")
else:
copy_file_with_date(src_path, dst_path, email_date)
print(f" ✓ Copied successfully")
success_count += 1
except Exception as e:
print(f" ✗ ERROR: {e}")
error_count += 1
errors.append((code, str(e)))
# Summary
print("\n" + "="*80)
print("SUMMARY")
print("="*80)
print(f"Total documents processed: {len(docs_to_copy)}")
if not dry_run:
print(f"Successfully copied: {success_count}")
print(f"Errors: {error_count}")
if errors:
print("\nErrors encountered:")
for code, error in errors[:10]: # Show first 10 errors
print(f" - {code}: {error}")
if len(errors) > 10:
print(f" ... and {len(errors) - 10} more")
else:
print("(Dry run - no files were copied)")
return success_count, error_count
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
dry_run |
- | True | positional_or_keyword |
test_mode |
- | False | positional_or_keyword |
test_count |
- | 3 | positional_or_keyword |
Parameter Details
dry_run: Boolean flag that when True (default), simulates the file copying process without actually copying files. Useful for validating the workflow and seeing what would happen. When False, files are actually copied to their destinations.
test_mode: Boolean flag that when True, limits processing to only the first few documents specified by test_count. When False (default), processes all eligible documents. Useful for testing the workflow on a small subset before full execution.
test_count: Integer specifying how many documents to process when test_mode is True. Default is 3. Only relevant when test_mode=True, otherwise ignored.
Return Value
Returns a tuple of two integers: (success_count, error_count). success_count is the number of files successfully copied (0 in dry_run mode), and error_count is the number of documents that encountered errors during processing. These counts allow the caller to assess the success rate of the operation.
Dependencies
csvosshutildatetimeresysargparse
Required Imports
import csv
import os
import shutil
from datetime import datetime
import re
import sys
import argparse
Usage Example
# Dry run to see what would happen
success, errors = main(dry_run=True, test_mode=False)
print(f'Would copy {success} files with {errors} errors')
# Test mode to process first 5 documents
success, errors = main(dry_run=False, test_mode=True, test_count=5)
print(f'Copied {success} files, {errors} errors')
# Full production run
success, errors = main(dry_run=False, test_mode=False)
if errors == 0:
print('All documents copied successfully')
else:
print(f'Completed with {errors} errors out of {success + errors} total')
Best Practices
- Always run with dry_run=True first to validate the workflow before actually copying files
- Use test_mode=True with a small test_count when testing changes to avoid processing large batches
- Ensure all required global variables (COMPARISON_CSV, OUTPUT_DIR, WUXI2_ROOT) are properly configured before calling
- Check that all helper functions (load_email_dates, has_wuxi_coding, find_best_folder, create_new_filename, copy_file_with_date) are defined and working
- Monitor the error_count return value and review error messages for documents that failed to process
- The function skips files that already exist at the destination to avoid overwriting
- Email dates are used for file modification times; if not found, current date is used as fallback
- Only documents with status 'PRESENT BUT UNSIGNED' or 'ABSENT' and containing WUXI coding are processed
- Review the summary output to understand what was processed and identify any issues
Tags
Similar Components
AI-powered semantic similarity - components with related functionality: