🔍 Code Extractor

class DocumentIndexer

Maturity: 51

A class for indexing documents into ChromaDB with support for multiple file formats (PDF, Word, PowerPoint, Excel, text files), smart incremental indexing, and document chunk management.

File:
/tf/active/vicechatdev/docchat/document_indexer.py
Lines:
790 - 1144
Complexity:
complex

Purpose

DocumentIndexer manages the complete lifecycle of document indexing into a ChromaDB vector database. It handles document processing, chunking, embedding generation, and storage with intelligent features like modification-time tracking for incremental updates, duplicate detection, and batch folder indexing. The class integrates with ChromaDB's HNSW index using cosine similarity for efficient semantic search and supports re-indexing modified documents while skipping unchanged ones.

Source Code

class DocumentIndexer:
    """Index documents in ChromaDB"""
    
    def __init__(self, collection_name: str = None, api_key: str = None):
        """
        Initialize document indexer
        
        Args:
            collection_name: Name of ChromaDB collection
            api_key: OpenAI API key
        """
        self.collection_name = collection_name or config.CHROMA_COLLECTION_NAME
        self.api_key = api_key or config.OPENAI_API_KEY
        
        # Initialize ChromaDB client - External connection to oneco_chroma (same pattern as vice_ai)
        logger.info(f"Connecting to ChromaDB at {config.CHROMA_HOST}:{config.CHROMA_PORT}")
        self.chroma_client = chromadb.HttpClient(
            host=config.CHROMA_HOST,
            port=config.CHROMA_PORT
        )
        
        # Initialize embedding function
        self.embedding_function = DocChatEmbeddingFunction(
            api_key=self.api_key,
            embed_model_name=config.EMBEDDING_MODEL,
            llm_model_name=config.SMALL_LLM_MODEL
        )
        
        # Get or create collection with HNSW configuration
        # In ChromaDB 1.3+, HNSW with cosine similarity is the default
        self.collection = self.chroma_client.get_or_create_collection(
            name=self.collection_name,
            embedding_function=self.embedding_function,
            metadata={"hnsw:space": "cosine", "description": "DocChat document collection"}
        )
        
        # Initialize document processor
        self.processor = DocumentProcessor()
        
        logger.info(f"Initialized DocumentIndexer with collection: {self.collection_name}")
    
    def is_document_indexed(self, file_path: Path) -> Optional[Dict[str, Any]]:
        """
        Check if document is already indexed and if it needs re-indexing
        
        Args:
            file_path: Path to document file
            
        Returns:
            Dictionary with doc_id and indexed_mtime if document is indexed, None otherwise
        """
        try:
            # Query for this file path
            results = self.collection.get(
                where={"file_path": str(file_path)},
                limit=1
            )
            
            if not results['ids']:
                return None
            
            # Get metadata from first chunk
            metadata = results['metadatas'][0]
            doc_id = metadata.get('doc_id')
            indexed_mtime = metadata.get('file_mtime')
            
            # Get current file modification time
            current_mtime = os.path.getmtime(file_path)
            
            # Check if file has been modified since indexing
            if indexed_mtime and float(indexed_mtime) >= current_mtime:
                return {
                    'doc_id': doc_id,
                    'indexed_mtime': indexed_mtime,
                    'needs_reindex': False
                }
            else:
                return {
                    'doc_id': doc_id,
                    'indexed_mtime': indexed_mtime,
                    'needs_reindex': True
                }
                
        except Exception as e:
            logger.warning(f"Error checking if document is indexed: {e}")
            return None
    
    def index_document(self, file_path: Path, force_reindex: bool = False) -> Dict[str, Any]:
        """
        Index a single document with smart incremental indexing
        
        Args:
            file_path: Path to document file
            force_reindex: If True, re-index even if already indexed
            
        Returns:
            Dictionary with indexing results
        """
        try:
            # Check if document is already indexed
            existing = self.is_document_indexed(file_path)
            
            if existing and not force_reindex:
                if not existing['needs_reindex']:
                    logger.info(f"Document already indexed and up-to-date: {file_path.name}")
                    return {
                        'success': True,
                        'skipped': True,
                        'doc_id': existing['doc_id'],
                        'file_name': file_path.name,
                        'reason': 'already_indexed'
                    }
                else:
                    # File has been modified, delete old version first
                    logger.info(f"Document modified, re-indexing: {file_path.name}")
                    self.delete_document(existing['doc_id'])
            
            logger.info(f"Indexing document: {file_path}")
            
            # Get file modification time
            file_mtime = os.path.getmtime(file_path)
            
            # Process document
            result = self.processor.process_document(file_path)
            
            if not result.get('success'):
                error_msg = result.get('error', 'Unknown error')
                logger.warning(f"Failed to index {file_path.name}: {error_msg}")
                return result
            
            chunks = result.get('chunks', [])
            if not chunks:
                logger.warning(f"No content extracted from {file_path.name} - likely scanned PDF without text layer")
                return {
                    'success': False, 
                    'error': 'No content extracted - possibly scanned/image PDF',
                    'file_name': file_path.name
                }
            
            # Prepare data for ChromaDB
            documents = []
            metadatas = []
            ids = []
            
            doc_id = str(uuid4())
            
            for i, chunk in enumerate(chunks):
                chunk_id = f"{doc_id}_{i}"
                documents.append(chunk['text'])
                metadatas.append({
                    **chunk['metadata'],
                    'chunk_index': i,
                    'doc_id': doc_id,
                    'chunk_type': chunk['type'],
                    'file_mtime': str(file_mtime)  # Store modification time
                })
                ids.append(chunk_id)
            
            # Add to collection
            try:
                self.collection.add(
                    documents=documents,
                    metadatas=metadatas,
                    ids=ids
                )
                
                logger.info(f"Successfully indexed {len(chunks)} chunks from {file_path.name}")
                return {
                    'success': True,
                    'doc_id': doc_id,
                    'num_chunks': len(chunks),
                    'file_name': file_path.name,
                    'reindexed': existing is not None
                }
                
            except Exception as e:
                logger.error(f"Failed to add to ChromaDB: {e}")
                try:
                    import traceback
                    logger.error(f"Traceback: {traceback.format_exc()}")
                except:
                    logger.error(f"Could not format traceback")
                return {'success': False, 'error': str(e)}
        
        except Exception as e:
            # Catch ANY exception to prevent background thread crash
            logger.error(f"CRITICAL: Unhandled exception while indexing {file_path.name}: {e}")
            try:
                import traceback
                logger.error(f"Traceback: {traceback.format_exc()}")
            except:
                logger.error(f"Could not format traceback")
            return {
                'success': False, 
                'error': f'Unhandled exception: {str(e)}',
                'file_name': file_path.name
            }
    
    def index_folder(self, folder_path: Path, recursive: bool = True, force_reindex: bool = False, 
                     progress_callback=None) -> Dict[str, Any]:
        """
        Index all documents in a folder with smart incremental indexing
        
        Args:
            folder_path: Path to folder
            recursive: Whether to recurse into subdirectories
            force_reindex: If True, re-index all documents even if already indexed
            progress_callback: Optional callback function(current, total, filename) for progress updates
            
        Returns:
            Dictionary with indexing results
        """
        logger.info(f"Indexing folder: {folder_path} (force_reindex={force_reindex})")
        
        results = {
            'success': 0,
            'failed': 0,
            'skipped': 0,
            'reindexed': 0,
            'total': 0,
            'details': []
        }
        
        # Get all supported files
        pattern = '**/*' if recursive else '*'
        
        # First, collect all files to get total count
        files_to_process = []
        for file_path in folder_path.glob(pattern):
            if file_path.is_file():
                ext = file_path.suffix.lower()
                
                # Check if supported extension
                all_extensions = (
                    self.processor.PDF_EXTENSIONS +
                    self.processor.WORD_EXTENSIONS +
                    self.processor.PPT_EXTENSIONS +
                    self.processor.EXCEL_EXTENSIONS +
                    self.processor.TEXT_EXTENSIONS
                )
                
                if ext in all_extensions:
                    files_to_process.append(file_path)
        
        total_files = len(files_to_process)
        results['total'] = total_files
        
        # Process files with progress updates
        for idx, file_path in enumerate(files_to_process, 1):
            # Call progress callback if provided
            if progress_callback:
                progress_callback(idx, total_files, file_path.name)
            
            result = self.index_document(file_path, force_reindex=force_reindex)
            
            if result.get('success'):
                if result.get('skipped'):
                    results['skipped'] += 1
                else:
                    results['success'] += 1
                    if result.get('reindexed'):
                        results['reindexed'] += 1
            else:
                results['failed'] += 1
            
            results['details'].append({
                'file': str(file_path),
                'result': result
            })
        
        # Log summary with details about failed files
        logger.info(
            f"Folder indexing complete. "
            f"New: {results['success'] - results['reindexed']}, "
            f"Re-indexed: {results['reindexed']}, "
            f"Skipped: {results['skipped']}, "
            f"Failed: {results['failed']}"
        )
        
        # Log failed files for review
        if results['failed'] > 0:
            logger.warning(f"=== {results['failed']} files could not be indexed ===")
            for detail in results['details']:
                if not detail['result'].get('success'):
                    error = detail['result'].get('error', 'Unknown error')
                    filename = Path(detail['file']).name
                    logger.warning(f"  - {filename}: {error}")
            logger.warning("Note: Scanned PDFs without text layer require OCR processing")
        
        return results
    
    def get_collection_stats(self) -> Dict[str, Any]:
        """Get statistics about the collection"""
        count = self.collection.count()
        
        # Get unique documents
        if count > 0:
            results = self.collection.get()
            doc_ids = set(meta.get('doc_id') for meta in results['metadatas'])
            file_names = set(meta.get('file_name') for meta in results['metadatas'])
        else:
            doc_ids = set()
            file_names = set()
        
        return {
            'total_chunks': count,
            'total_documents': len(doc_ids),
            'file_names': list(file_names)
        }
    
    def delete_document(self, doc_id: str) -> bool:
        """
        Delete a document and all its chunks
        
        Args:
            doc_id: Document ID
            
        Returns:
            True if successful
        """
        try:
            # Get all chunk IDs for this document
            results = self.collection.get(
                where={"doc_id": doc_id}
            )
            
            if results['ids']:
                self.collection.delete(ids=results['ids'])
                logger.info(f"Deleted document {doc_id} ({len(results['ids'])} chunks)")
                return True
            else:
                logger.warning(f"No chunks found for document {doc_id}")
                return False
                
        except Exception as e:
            logger.error(f"Failed to delete document {doc_id}: {e}")
            return False
    
    def clear_collection(self) -> bool:
        """Clear all documents from collection"""
        try:
            # Delete and recreate collection
            self.chroma_client.delete_collection(name=self.collection_name)
            # In ChromaDB 1.3+, configuration is a dictionary
            self.collection = self.chroma_client.create_collection(
                name=self.collection_name,
                embedding_function=self.embedding_function,
                configuration={"hnsw": {"space": "cosine"}},
                metadata={"description": "DocChat document collection"}
            )
            logger.info("Collection cleared")
            return True
        except Exception as e:
            logger.error(f"Failed to clear collection: {e}")
            return False

Parameters

Name Type Default Kind
bases - -

Parameter Details

collection_name: Name of the ChromaDB collection to use for storing document embeddings. If not provided, defaults to config.CHROMA_COLLECTION_NAME. This collection will be created if it doesn't exist.

api_key: OpenAI API key for generating embeddings. If not provided, defaults to config.OPENAI_API_KEY. Required for the embedding function to work.

Return Value

Instantiation returns a DocumentIndexer object with initialized ChromaDB client, collection, embedding function, and document processor. Methods return dictionaries with operation results: index_document returns {'success': bool, 'doc_id': str, 'num_chunks': int, 'skipped': bool, 'reindexed': bool}; index_folder returns {'success': int, 'failed': int, 'skipped': int, 'reindexed': int, 'total': int, 'details': list}; is_document_indexed returns {'doc_id': str, 'indexed_mtime': str, 'needs_reindex': bool} or None; get_collection_stats returns {'total_chunks': int, 'total_documents': int, 'file_names': list}; delete_document and clear_collection return bool.

Class Interface

Methods

__init__(self, collection_name: str = None, api_key: str = None)

Purpose: Initialize the DocumentIndexer with ChromaDB connection, embedding function, and document processor

Parameters:

  • collection_name: Name of ChromaDB collection (defaults to config.CHROMA_COLLECTION_NAME)
  • api_key: OpenAI API key for embeddings (defaults to config.OPENAI_API_KEY)

Returns: None - initializes instance attributes

is_document_indexed(self, file_path: Path) -> Optional[Dict[str, Any]]

Purpose: Check if a document is already indexed and whether it needs re-indexing based on modification time

Parameters:

  • file_path: Path object pointing to the document file to check

Returns: Dictionary with 'doc_id', 'indexed_mtime', and 'needs_reindex' keys if document exists, None otherwise

index_document(self, file_path: Path, force_reindex: bool = False) -> Dict[str, Any]

Purpose: Index a single document with smart incremental indexing that skips unchanged files and re-indexes modified ones

Parameters:

  • file_path: Path object pointing to the document file to index
  • force_reindex: If True, re-index even if document is already indexed and unchanged

Returns: Dictionary with 'success' (bool), 'doc_id' (str), 'num_chunks' (int), 'file_name' (str), 'skipped' (bool), 'reindexed' (bool), and 'error' (str) if failed

index_folder(self, folder_path: Path, recursive: bool = True, force_reindex: bool = False, progress_callback=None) -> Dict[str, Any]

Purpose: Index all supported documents in a folder with optional recursion and progress tracking

Parameters:

  • folder_path: Path object pointing to the folder containing documents
  • recursive: If True, recursively process subdirectories
  • force_reindex: If True, re-index all documents even if unchanged
  • progress_callback: Optional callback function(current, total, filename) called for each file processed

Returns: Dictionary with 'success' (int), 'failed' (int), 'skipped' (int), 'reindexed' (int), 'total' (int), and 'details' (list) containing per-file results

get_collection_stats(self) -> Dict[str, Any]

Purpose: Get statistics about the ChromaDB collection including chunk count and unique documents

Returns: Dictionary with 'total_chunks' (int), 'total_documents' (int), and 'file_names' (list of str)

delete_document(self, doc_id: str) -> bool

Purpose: Delete a document and all its chunks from the collection

Parameters:

  • doc_id: Unique document identifier (UUID string) to delete

Returns: True if deletion successful, False otherwise

clear_collection(self) -> bool

Purpose: Delete and recreate the entire collection, removing all documents

Returns: True if successful, False otherwise

Attributes

Name Type Description Scope
collection_name str Name of the ChromaDB collection being used instance
api_key str OpenAI API key for generating embeddings instance
chroma_client chromadb.HttpClient ChromaDB HTTP client connected to the configured server instance
embedding_function DocChatEmbeddingFunction Embedding function instance for generating document embeddings instance
collection chromadb.Collection ChromaDB collection object with HNSW index and cosine similarity instance
processor DocumentProcessor Document processor instance for extracting text and chunks from various file formats instance

Dependencies

  • chromadb
  • openai
  • tiktoken
  • numpy
  • llmsherpa
  • PyPDF2
  • python-pptx
  • openpyxl
  • pandas
  • python-docx
  • pdf2image
  • Pillow
  • pytesseract
  • easyocr
  • pathlib
  • uuid
  • logging
  • os
  • tempfile
  • subprocess
  • threading
  • warnings
  • sys
  • traceback

Required Imports

import os
import logging
from pathlib import Path
from uuid import uuid4
from typing import Dict, Any, Optional
import chromadb
import config

Conditional/Optional Imports

These imports are only needed under specific conditions:

from llmsherpa.readers import LayoutPDFReader

Condition: used by DocumentProcessor for PDF processing

Required (conditional)
import PyPDF2

Condition: used by DocumentProcessor for PDF text extraction

Required (conditional)
import pptx

Condition: used by DocumentProcessor for PowerPoint files

Required (conditional)
import openpyxl

Condition: used by DocumentProcessor for Excel files

Required (conditional)
from docx import Document as DocxDocument

Condition: used by DocumentProcessor for Word documents

Required (conditional)
from pdf2image import convert_from_path

Condition: used for OCR processing of scanned PDFs

Optional
import pytesseract

Condition: used for OCR text extraction

Optional
import easyocr

Condition: alternative OCR engine

Optional

Usage Example

# Initialize the indexer
indexer = DocumentIndexer(
    collection_name='my_documents',
    api_key='sk-...'
)

# Check collection stats
stats = indexer.get_collection_stats()
print(f"Total documents: {stats['total_documents']}")

# Index a single document
from pathlib import Path
result = indexer.index_document(Path('document.pdf'))
if result['success']:
    print(f"Indexed {result['num_chunks']} chunks")
    doc_id = result['doc_id']

# Index entire folder with progress tracking
def progress_callback(current, total, filename):
    print(f"Processing {current}/{total}: {filename}")

folder_result = indexer.index_folder(
    Path('/path/to/documents'),
    recursive=True,
    force_reindex=False,
    progress_callback=progress_callback
)
print(f"Success: {folder_result['success']}, Failed: {folder_result['failed']}")

# Check if document needs re-indexing
file_path = Path('document.pdf')
status = indexer.is_document_indexed(file_path)
if status and status['needs_reindex']:
    print("Document has been modified, re-indexing...")
    indexer.index_document(file_path, force_reindex=True)

# Delete a document
indexer.delete_document(doc_id)

# Clear entire collection
indexer.clear_collection()

Best Practices

  • Always check is_document_indexed() before indexing to avoid duplicate work unless force_reindex=True is needed
  • Use progress_callback with index_folder() for long-running batch operations to provide user feedback
  • Handle failed indexing gracefully - the class returns detailed error information in result dictionaries
  • Scanned PDFs without text layers will fail to index unless OCR is configured - check error messages for 'No content extracted'
  • The class maintains file modification times (mtime) to detect changes - ensure file system timestamps are reliable
  • ChromaDB connection is established in __init__ - ensure the server is running before instantiation
  • Document chunks are stored with metadata including doc_id, chunk_index, file_path, and file_mtime for tracking
  • Use delete_document() before re-indexing modified files to avoid orphaned chunks (handled automatically by index_document)
  • Collection uses HNSW index with cosine similarity - appropriate for semantic similarity search
  • The class is thread-safe for indexing operations but ChromaDB client connections should not be shared across processes
  • Large folder indexing operations can be memory-intensive - consider processing in batches for very large document sets
  • Failed files are logged with detailed error messages - review logs to identify problematic documents
  • The embedding_function is initialized once and reused - do not modify after initialization

Similar Components

AI-powered semantic similarity - components with related functionality:

  • function get_document_info 66.1% similar

    Retrieves indexing status and metadata for a document, including whether it's indexed, its document ID, chunk count, and reindexing status.

    From: /tf/active/vicechatdev/docchat/app.py
  • function index_documents_example 61.8% similar

    A demonstration function that indexes documents from a specified folder using a DocumentIndexer, creating the folder if it doesn't exist, and displays indexing results and collection statistics.

    From: /tf/active/vicechatdev/docchat/example_usage.py
  • class MyEmbeddingFunction_v1 60.9% similar

    A custom embedding function class that generates embeddings for documents using OpenAI's API, with built-in text summarization for long documents and token management.

    From: /tf/active/vicechatdev/OneCo_hybrid_RAG copy.py
  • class DocumentProcessor_v7 60.5% similar

    Process different document types for indexing

    From: /tf/active/vicechatdev/docchat/document_indexer.py
  • class DocChatEmbeddingFunction 59.7% similar

    A custom ChromaDB embedding function that generates OpenAI embeddings with automatic text summarization for documents exceeding token limits.

    From: /tf/active/vicechatdev/docchat/document_indexer.py
← Back to Browse