class EInkLLMProcessor
Maturity: 26
Main processor class that handles the complete workflow
File:
/tf/active/vicechatdev/e-ink-llm/processor.py
Lines:
61 - 455
61 - 455
Complexity:
moderate
moderate
Purpose
Main processor class that handles the complete workflow
Source Code
class EInkLLMProcessor:
"""Main processor class that handles the complete workflow"""
def __init__(self, api_key: Optional[str] = None, watch_folder: Optional[str] = None,
conversation_id: Optional[str] = None, compact_mode: bool = True,
auto_detect_session: bool = True, enable_multi_page: bool = True,
max_pages: int = 50, enable_editing_workflow: bool = True,
enable_hybrid_mode: bool = True):
# Initialize components
self.input_processor = InputProcessor(enable_multi_page=enable_multi_page, max_pages=max_pages)
self.llm_handler = LLMHandler(api_key)
self.multi_page_handler = MultiPageLLMHandler(api_key) if enable_multi_page else None
self.pdf_generator = PDFGenerator()
self.session_manager = SessionManager()
self.compact_formatter = CompactResponseFormatter()
self.session_detector = SessionDetector()
self.conversation_context = ConversationContextManager(self.session_manager)
self.editing_workflow = EditingWorkflowHandler(self.llm_handler) if enable_editing_workflow else None
# Initialize hybrid response handler if available and enabled
self.hybrid_handler = None
self.enable_hybrid_mode = enable_hybrid_mode and HYBRID_AVAILABLE
if self.enable_hybrid_mode:
self.hybrid_handler = HybridResponseHandler(api_key)
print(f"šØ Hybrid mode enabled (text + graphics)")
elif enable_hybrid_mode and not HYBRID_AVAILABLE:
print(f"ā ļø Hybrid mode requested but dependencies not available")
# Configuration
self.compact_mode = compact_mode
self.auto_detect_session = auto_detect_session
self.enable_multi_page = enable_multi_page
self.max_pages = max_pages
self.enable_editing_workflow = enable_editing_workflow
# Session management
if conversation_id:
self.conversation_id = conversation_id
else:
self.conversation_id = self.session_manager.create_conversation()
# Set up watch folder
self.watch_folder = Path(watch_folder) if watch_folder else Path.cwd() / "watch"
self.watch_folder.mkdir(exist_ok=True)
# Set up logging
self.setup_logging()
print(f"šÆ E-Ink LLM Processor initialized")
print(f"š Conversation ID: {self.conversation_id}")
print(f"š Watch folder: {self.watch_folder.absolute()}")
print(f"š¤ Models: {self.llm_handler.small_model} (preprocessing), {self.llm_handler.main_model} (main)")
print(f"š Compact mode: {'ON' if self.compact_mode else 'OFF'}")
print(f"šØ Hybrid mode: {'ON' if self.enable_hybrid_mode else 'OFF'} (text + graphics)")
print(f"š Auto-detect sessions: {'ON' if self.auto_detect_session else 'OFF'}")
print(f"š Multi-page PDFs: {'ON' if self.enable_multi_page else 'OFF'} (max {self.max_pages} pages)")
print(f"š¾ Session tracking: {self.session_manager.db_path}")
def setup_logging(self):
"""Set up logging for the application"""
log_file = self.watch_folder / "eink_llm.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
async def process_file(self, file_path: Path) -> Optional[Path]:
"""
Process a single file and generate response PDF
Args:
file_path: Path to input file
Returns:
Path to generated response PDF or None if failed
"""
start_time = time.time()
try:
# Auto-detect session information if enabled
detected_session = None
if self.auto_detect_session:
detected_session = detect_session_from_file(str(file_path))
if detected_session:
if detected_session.confidence >= 0.7: # High confidence threshold
print(f"š Auto-detected session: {detected_session.conversation_id} "
f"(exchange #{detected_session.exchange_number}, "
f"confidence: {detected_session.confidence:.2f})")
# Update conversation ID to detected one
self.conversation_id = detected_session.conversation_id
print(f"š Switched to detected conversation: {self.conversation_id}")
else:
print(f"š Detected session with low confidence ({detected_session.confidence:.2f}), "
f"continuing with current conversation")
print(f"\n{'='*60}")
print(f"š PROCESSING: {file_path.name}")
print(f"š Conversation: {self.conversation_id}")
if detected_session and detected_session.confidence >= 0.7:
print(f"šÆ Auto-continuation from exchange #{detected_session.exchange_number}")
print(f"{'='*60}")
# Step 1: Extract image(s) from input
print(f"šø Step 1: Extracting image from {file_path.suffix} file...")
extraction_result = self.input_processor.extract_image(str(file_path))
# Handle both single-page and multi-page results
if isinstance(extraction_result[0], list):
# Multi-page PDF
page_images, metadata = extraction_result
is_multi_page = True
print(f"ā
Multi-page extraction successful")
print(f" ⢠Total pages: {metadata.get('total_pages', len(page_images))}")
print(f" ⢠Processed pages: {len(page_images)}")
print(f" ⢠Content pages: {metadata.get('content_pages', 'Unknown')}")
print(f" ⢠Total text length: {metadata.get('total_text_length', 0):,} chars")
else:
# Single page
image_b64, metadata = extraction_result
is_multi_page = False
print(f"ā
Image extracted successfully")
print(f" ⢠Dimensions: {metadata.get('dimensions', 'Unknown')}")
print(f" ⢠Source type: {metadata.get('source_type', 'Unknown')}")
# Step 2: Get conversation context
conversation_context = self.session_manager.get_conversation_context(self.conversation_id)
if conversation_context:
print(f"š¬ Using conversation context ({len(conversation_context)} chars)")
metadata['conversation_context'] = conversation_context
# Add detected session context if available
if detected_session and detected_session.confidence >= 0.7:
metadata['continuation_note'] = (
f"This appears to be a follow-up to exchange #{detected_session.exchange_number} "
f"in conversation {detected_session.conversation_id}. "
f"Please provide a response that builds upon the previous conversation."
)
# Step 3: Process with AI
if is_multi_page and self.multi_page_handler:
print(f"š§ Step 3: Processing multi-page document with AI...")
# Get pages from multi-page processor
pages, _ = self.input_processor.multi_page_processor.extract_all_pages(file_path)
# Analyze with multi-page handler
multi_result = await self.multi_page_handler.analyze_multi_page_document(
pages, metadata, conversation_context
)
llm_response = multi_result.combined_response
print(f"ā
Multi-page AI processing complete")
print(f" ⢠Pages analyzed: {multi_result.processing_stats['pages_processed']}")
print(f" ⢠Analysis method: {', '.join(multi_result.processing_stats['analysis_methods'])}")
print(f" ⢠Document type: {multi_result.document_summary.document_type}")
print(f" ⢠Response length: {len(llm_response):,} characters")
# Use first page image for PDF generation
image_b64 = page_images[0] if page_images else ""
else:
print(f"š§ Step 3: Processing with AI...")
# Get enhanced context using conversation context manager
enhanced_prompt = self.conversation_context.enhance_prompt_with_context(
base_prompt="", # Will be handled by LLM handler
conversation_id=self.conversation_id,
session_manager=self.session_manager
)
# Add enhanced context to metadata
if enhanced_prompt:
metadata['enhanced_context'] = enhanced_prompt
print(f" ⢠Enhanced with conversation context")
llm_response = await self.llm_handler.analyze_and_respond(image_b64, metadata)
print(f"ā
AI processing complete ({len(llm_response):,} characters)")
# Step 3.5: Process editing workflow if enabled
editing_workflow_result = None
if self.editing_workflow:
print(f"āļø Step 3.5: Processing editing workflow...")
editing_workflow_result = await self.editing_workflow.process_document_for_editing(
file_path, image_b64, llm_response
)
if editing_workflow_result:
print(f" ⢠Detected {editing_workflow_result.annotations_detected} annotations")
print(f" ⢠Confidence: {editing_workflow_result.confidence_score:.2f}")
if editing_workflow_result.rewritten_content:
print(f" ⢠Generated rewritten content ({len(editing_workflow_result.rewritten_content):,} chars)")
# Add editing workflow results to metadata for PDF generation
metadata['editing_workflow'] = {
'annotations_detected': editing_workflow_result.annotations_detected,
'confidence_score': editing_workflow_result.confidence_score,
'recommendations': editing_workflow_result.recommendations,
'rewritten_content': editing_workflow_result.rewritten_content
}
else:
print(f" ⢠No annotations detected or workflow failed")
# Step 4: Apply compact formatting if enabled
final_response = llm_response
if self.compact_mode:
print(f"šÆ Step 4: Applying compact formatting...")
compact_response = self.compact_formatter.parse_llm_response_to_compact(llm_response)
if compact_response:
final_response = compact_response
print(f" ⢠Compressed: {len(llm_response)} ā {len(compact_response)} chars ({len(compact_response)/len(llm_response)*100:.0f}%)")
else:
print(f" ⢠Compact formatting failed, using original")
# Step 5: Generate output PDF with session-aware filename
print(f"š Step 5: Generating response PDF...")
# Get current exchange number
conversation = self.session_manager.get_conversation(self.conversation_id)
next_exchange_num = (conversation.total_exchanges + 1) if conversation else 1
output_filename = self.session_manager.generate_session_filename(
self.conversation_id, next_exchange_num, file_path.name
)
output_path = file_path.parent / output_filename
# Enable hybrid mode in metadata for this processing
if self.enable_hybrid_mode:
metadata['enable_hybrid_mode'] = True
# Use hybrid response handler if available and response contains graphics
if (self.enable_hybrid_mode and self.hybrid_handler and
'[GRAPHIC:' in final_response):
print(f" šØ Using hybrid mode (text + graphics)")
generated_pdf = await self.hybrid_handler.process_hybrid_response(
llm_response=final_response,
metadata=metadata,
output_path=str(output_path),
conversation_id=self.conversation_id,
exchange_number=next_exchange_num
)
else:
# Use standard PDF generation
if self.enable_hybrid_mode and '[GRAPHIC:' in final_response:
print(f" ā ļø Graphics detected but hybrid handler not available, using standard mode")
self.pdf_generator.create_response_pdf(
llm_response=final_response,
original_image_b64=image_b64,
metadata=metadata,
output_path=str(output_path),
conversation_id=self.conversation_id,
exchange_number=next_exchange_num
)
# Step 6: Record exchange in session
processing_time = time.time() - start_time
usage_stats = self.llm_handler.get_usage_summary()
exchange_id = self.session_manager.add_exchange(
conversation_id=self.conversation_id,
input_file=str(file_path),
input_type=file_path.suffix,
response_text=final_response,
processing_time=processing_time,
tokens_used=usage_stats['total_tokens_used'],
metadata={
'dimensions': metadata.get('dimensions'),
'source_type': metadata.get('source_type'),
'compact_mode': self.compact_mode,
'original_response_length': len(llm_response),
'final_response_length': len(final_response)
}
)
# Log success
print(f"\nš SUCCESS! Processing completed in {processing_time:.1f} seconds")
print(f"š Response saved: {output_path.name}")
print(f"š Exchange ID: {exchange_id}")
# Log usage statistics
print(f"š Usage: {usage_stats['total_tokens_used']} tokens, ~${usage_stats['total_cost_estimate']:.3f}")
self.logger.info(f"Successfully processed {file_path.name} -> {output_path.name} "
f"({processing_time:.1f}s, {usage_stats['total_tokens_used']} tokens, {exchange_id})")
return output_path
except Exception as e:
error_msg = f"Error processing {file_path.name}: {str(e)}"
print(f"\nā ERROR: {error_msg}")
self.logger.error(error_msg)
# Generate error PDF with session-aware filename
try:
conversation = self.session_manager.get_conversation(self.conversation_id)
next_exchange_num = (conversation.total_exchanges + 1) if conversation else 1
error_filename = self.session_manager.generate_session_filename(
self.conversation_id, next_exchange_num, file_path.name, is_error=True
)
error_output_path = file_path.parent / error_filename
self.pdf_generator.generate_error_pdf(
error_message=str(e),
original_file=str(file_path),
output_path=str(error_output_path),
conversation_id=self.conversation_id,
exchange_number=next_exchange_num
)
print(f"š Error report saved: {error_output_path.name}")
# Record error exchange
processing_time = time.time() - start_time
self.session_manager.add_exchange(
conversation_id=self.conversation_id,
input_file=str(file_path),
input_type=file_path.suffix,
response_text=f"ERROR: {str(e)}",
processing_time=processing_time,
tokens_used=0,
metadata={'error': True, 'error_message': str(e)}
)
return error_output_path
except Exception as pdf_error:
print(f"ā Failed to generate error PDF: {pdf_error}")
return None
async def process_existing_files(self):
"""Process any existing files in the watch folder"""
print(f"š Checking for existing files in {self.watch_folder}...")
existing_files = [
f for f in self.watch_folder.iterdir()
if f.is_file() and InputProcessor.is_supported_file(f) and not f.name.startswith(('RESPONSE_', 'ERROR_'))
]
if existing_files:
print(f"š Found {len(existing_files)} existing file(s) to process")
for file_path in existing_files:
await self.process_file(file_path)
else:
print(f"š No existing files found")
async def start_watching(self, process_existing: bool = True):
"""
Start watching the folder for new files
Args:
process_existing: Whether to process existing files on startup
"""
print(f"\nšÆ Starting E-Ink LLM File Processor")
print(f"š Watching folder: {self.watch_folder.absolute()}")
print(f"š Supported formats: PDF, JPG, JPEG, PNG, GIF, BMP, TIFF, WEBP")
print(f"š” Place files in the watch folder to process them automatically")
print(f"š Responses will be saved with conversation tracking")
print(f"\n{'='*60}")
# Process existing files if requested
if process_existing:
await self.process_existing_files()
# Set up file system watcher
event_handler = EInkFileHandler(self)
observer = Observer()
observer.schedule(event_handler, str(self.watch_folder), recursive=False)
# Start watching
observer.start()
print(f"šļø File watcher started. Monitoring for new files...")
print(f"š¾ Logs are saved to: {self.watch_folder / 'eink_llm.log'}")
print(f"š Press Ctrl+C to stop")
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print(f"\nš Stopping file watcher...")
observer.stop()
# Print final usage summary
usage_stats = self.llm_handler.get_usage_summary()
print(f"\nš FINAL USAGE SUMMARY:")
print(f" ⢠Preprocessing calls: {usage_stats['preprocessing_calls']}")
print(f" ⢠Main processing calls: {usage_stats['main_processing_calls']}")
print(f" ⢠Total tokens used: {usage_stats['total_tokens_used']:,}")
print(f" ⢠Estimated cost: ${usage_stats['total_cost_estimate']:.3f}")
observer.join()
print(f"ā
File watcher stopped")
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self, api_key, watch_folder, conversation_id, compact_mode, auto_detect_session, enable_multi_page, max_pages, enable_editing_workflow, enable_hybrid_mode)
Purpose: Internal method: init
Parameters:
api_key: Type: Optional[str]watch_folder: Type: Optional[str]conversation_id: Type: Optional[str]compact_mode: Type: boolauto_detect_session: Type: boolenable_multi_page: Type: boolmax_pages: Type: intenable_editing_workflow: Type: boolenable_hybrid_mode: Type: bool
Returns: None
setup_logging(self)
Purpose: Set up logging for the application
Returns: None
Required Imports
import os
import asyncio
import time
from pathlib import Path
from watchdog.observers import Observer
Usage Example
# Example usage:
# result = EInkLLMProcessor(bases)
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class InvoiceProcessor 65.9% similar
-
class RemarkableEInkProcessor 63.3% similar
-
class OneDriveProcessor 56.7% similar
-
class DataProcessor 56.4% similar
-
class DataProcessor_v1 56.1% similar