class OneCo_hybrid_RAG_v2
A class named OneCo_hybrid_RAG
/tf/active/vicechatdev/OneCo_hybrid_RAG.py
1170 - 4142
moderate
Purpose
No detailed description available
Source Code
class OneCo_hybrid_RAG ():
def __init__(self):
## Set API keys
self.set_api_keys()
## Define the flow control variables to be exposed and set default values
self.flow_control = {
"pre_model" : ["OpenAi","gpt-4o-mini",0],
"model" : ["OpenAi","gpt-4o",0],
"search_engine" : ["Serper","google"],
"enable_search" : False,
"enable_memory" : False,
"memory_max_size" : 3,
"enable_referencing" : True,
"enable_extensive_search" : False, # New extensive search option
"extensive_search_chunks" : 100, # Number of chunks for extensive search
"enable_keyword_filtering" : False, # Keyword filtering option (off by default)
"target_summary_tokens" : 8000, # Target tokens for comprehensive summary
"detail_level" : "Balanced", # Detail level: Summary, Balanced, Detailed, Comprehensive
"manual_keywords" : "", # Manual keywords for filtering
"enable_reference_filtering" : True, # Reference relevance filtering
"relevance_threshold" : 0.3, # Minimum relevance score for references
"detailed_instructions" : "", # Custom detailed instructions for the LLM
}
## Different type of data can be provided here and will be included in the flow
self.data_handles = SimpleDataHandle()
## Small LLM usage tracking
self.small_llm_usage = {
"keyword_extraction": 0,
"query_expansion": 0,
"document_summarization": 0,
"total_calls": 0
}
## Large LLM usage tracking
self.large_llm_usage = 0
## Define the UI elements to be exposed
self.chat_interface=pn.chat.ChatInterface(callback=self.response_callback,width=1200,callback_exception='verbose')
## Plan for chat memory
self.chat_memory = SimpleChatMemory(max_history=self.flow_control["memory_max_size"])
self.extended_query=None
# Set up the blocks_dict for references
self.blocks_dict = {}
self.block_counter = 1
# Explicitly set OpenAI API type for this class
os.environ["OPENAI_API_TYPE"] = "openai"
# Initialize extensive search manager
self.extensive_search_manager = None
# Initialize instruction templates
self.instruction_templates = {
"Default": """Please provide a comprehensive and well-structured response using the available data sources. Format your response in clear Markdown with appropriate headings and sections.""",
"Vaccine Development": """You are an expert in vaccine development and formulation design. Please provide a detailed scientific response that includes:
**Structure Requirements:**
- Use clear scientific headings and subheadings
- Include methodology sections where relevant
- Provide specific technical details and parameters
- Reference regulatory guidelines when applicable
**Content Requirements:**
- Focus on mechanism of action, formulation stability, and manufacturing considerations
- Include safety and efficacy data when available
- Discuss regulatory pathways and requirements
- Consider scalability and commercial viability
**Formatting:**
- Use bullet points for key findings
- Include tables for comparative data when appropriate
- Cite all sources with inline references
- End with a clear summary of key takeaways""",
"Scientific Report": """Generate a comprehensive scientific report with the following structure:
## Executive Summary
Brief overview of key findings and conclusions
## Introduction
Background and context for the analysis
## Methodology
Approach and data sources used
## Results and Analysis
Detailed findings with supporting data
## Discussion
Interpretation of results and implications
## Conclusions
Key takeaways and recommendations
## References
All cited sources
**Formatting Guidelines:**
- Use professional scientific language
- Include quantitative data where available
- Provide statistical analysis when relevant
- Maintain objective tone throughout""",
"Technical Analysis": """Provide a detailed technical analysis focusing on:
**Technical Specifications:**
- Detailed parameters and measurements
- Performance metrics and benchmarks
- Technical constraints and limitations
**Analysis Structure:**
- Problem definition and scope
- Technical approach and methodology
- Detailed results with supporting data
- Risk assessment and mitigation strategies
- Implementation recommendations
**Format Requirements:**
- Use technical terminology appropriately
- Include diagrams or flowcharts concepts when relevant
- Provide step-by-step procedures where applicable
- Include troubleshooting guidance
- Reference industry standards and best practices""",
"Legal Document Analysis": """You are a legal document analyst specializing in precise clause extraction and legal interpretation. Provide a comprehensive legal analysis with the following structure:
## Document Overview
- Document type and classification
- Parties involved and their roles
- Primary legal purpose and scope
## Key Provisions Analysis
**For each significant clause/provision:**
- **Literal Text**: Provide exact verbatim quotations from source documents
- **Section/Article Reference**: Include specific section numbers or clause identifiers
- **Legal Interpretation**: Explain the legal meaning and implications
- **Practical Impact**: Describe real-world consequences and obligations
## Temporal Provisions
**Extract all time-sensitive elements:**
- **Effective Dates**: When provisions take effect
- **Expiration Dates**: When provisions terminate
- **Renewal Terms**: Automatic or manual renewal conditions
- **Notice Periods**: Required advance notice for actions
- **Statute of Limitations**: Time limits for legal actions
- **Performance Deadlines**: Time-bound obligations
## Rights and Obligations Matrix
- **Party A Rights/Obligations**: Clearly delineated responsibilities
- **Party B Rights/Obligations**: Corresponding duties and entitlements
- **Conditional Provisions**: Rights/duties triggered by specific conditions
- **Default Provisions**: Consequences of non-compliance
## Risk Assessment
- **Legal Risks**: Potential liabilities and exposures
- **Compliance Requirements**: Mandatory actions to avoid violations
- **Dispute Resolution**: Mechanisms for handling conflicts
- **Termination Conditions**: Grounds for contract/agreement termination
## Critical Dates Calendar
Present chronological timeline of all important dates identified in the documents.
**Formatting Guidelines:**
- Use precise legal terminology
- Maintain strict accuracy in quotations
- Cross-reference related provisions
- Highlight potential ambiguities or conflicts
- Provide clear section headers for easy navigation
- Include exact citations and page references where available"""
}
# Load custom templates from file if available
self.load_custom_templates()
self.init_connections()
return
def get_instruction_template(self, template_name):
"""Get instruction template by name"""
return self.instruction_templates.get(template_name, "")
def save_instruction_template(self, template_name, instructions):
"""Save a custom instruction template"""
try:
# Add to memory
self.instruction_templates[template_name] = instructions
# Save to file
custom_templates_file = "custom_instruction_templates.json"
custom_templates = {}
# Load existing custom templates
try:
with open(custom_templates_file, 'r') as f:
custom_templates = json.load(f)
except FileNotFoundError:
pass
# Add new template
custom_templates[template_name] = instructions
# Save back to file
with open(custom_templates_file, 'w') as f:
json.dump(custom_templates, f, indent=2)
print(f"ā
Instruction template '{template_name}' saved successfully")
return True
except Exception as e:
print(f"ā Error saving template '{template_name}': {e}")
return False
def load_custom_templates(self):
"""Load custom instruction templates from file"""
try:
custom_templates_file = "custom_instruction_templates.json"
with open(custom_templates_file, 'r') as f:
custom_templates = json.load(f)
# Add custom templates to the main dictionary
self.instruction_templates.update(custom_templates)
print(f"š Loaded {len(custom_templates)} custom instruction templates")
except FileNotFoundError:
print("š No custom instruction templates file found - starting fresh")
except Exception as e:
print(f"ā Error loading custom templates: {e}")
def init_connections(self):
uri = config.DB_ADDR
user, password = config.DB_AUTH
self.driver = GraphDatabase.driver(uri, auth=(user, password))
self.session = self.driver.session(database=config.DB_NAME)
api_key = "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" # Replace with your actual API key
self.chroma_embedder=MyEmbeddingFunction("gpt-4o-mini","text-embedding-3-small",api_key)
self.chroma_client=chromadb.HttpClient(host='vice_chroma', port=8000)
# Get collection names (handle both ChromaDB v0.5.x and v0.6.0+ API changes)
try:
print(f"š Attempting to connect to ChromaDB at vice_chroma:8000...")
collections = self.chroma_client.list_collections()
print(f"ā
ChromaDB connection successful!")
print(f"š Raw collections response: {collections} (type: {type(collections)})")
# Handle ChromaDB API version differences
if collections and isinstance(collections[0], str):
# v0.6.0+: collections are already strings (collection names)
self.available_collections = list(collections)
print(f"ā
ChromaDB v0.6.0+ detected: Found {len(self.available_collections)} collections")
print(f" Collections: {self.available_collections}")
elif collections and hasattr(collections[0], 'name'):
# Older version (v0.5.x): collections are objects with .name attribute
self.available_collections = [col.name for col in collections]
print(f"ā
ChromaDB v0.5.x detected: Found {len(self.available_collections)} collections")
print(f" Collections: {self.available_collections}")
elif collections:
# Handle any other format by trying to convert
try:
self.available_collections = [str(col) for col in collections]
print(f"ā
ChromaDB: Found {len(self.available_collections)} collections (converted to strings)")
print(f" Collections: {self.available_collections}")
except:
self.available_collections = []
print("ā ChromaDB: Could not convert collection list")
else:
self.available_collections = []
print("ā ļø ChromaDB: No collections found - database may be empty")
except Exception as e:
print(f"ā ChromaDB Connection Error: {e}")
print(f" Error type: {type(e).__name__}")
# Suppress the specific v0.6.0 warning we know about
if "v0.6.0" not in str(e) and "list_collections only returns collection names" not in str(e):
print(f"Warning: Could not retrieve ChromaDB collections: {e}")
self.available_collections = []
# Initialize extensive search manager
self.extensive_search_manager = ExtensiveSearchManager(
session=self.session,
chroma_client=self.chroma_client,
api_key=api_key,
rag_instance=self # Pass the RAG instance for usage tracking
)
return
def run_query(self, query, params=None):
"""
Execute a Cypher query and return the result
Parameters
----------
query : str
The Cypher query to execute
params : dict, optional
Parameters for the query
Returns
-------
result
The query result
"""
if params is None:
params = {}
return self.session.run(query, params)
def evaluate_query(self, query, params=None):
"""
Execute a Cypher query and return a single result
Parameters
----------
query : str
The Cypher query to execute
params : dict, optional
Parameters for the query
Returns
-------
object
The single result value
"""
if params is None:
params = {}
result = self.session.run(query, params)
record = result.single()
if record:
return record[0]
return None
def push_changes(self, node):
"""
Push changes to a node to the database
Parameters
----------
node : dict or node-like object
Node with properties to update
"""
# Extract node properties, handling both dict-like and node-like objects
if hasattr(node, 'items'):
# Dict-like object
properties = {k: v for k, v in node.items() if k != 'labels'}
labels = node.get('labels', [])
uid = node.get('UID')
else:
# Node-like object from previous driver
properties = {k: node[k] for k in node.keys() if k != 'UID'}
labels = list(node.labels)
uid = node['UID']
# Construct labels string for Cypher
if labels:
labels_str = ':'.join(labels)
match_clause = f"MATCH (n:{labels_str} {{UID: $uid}})"
else:
match_clause = "MATCH (n {UID: $uid})"
# Update node properties
if properties:
set_clauses = [f"n.`{key}` = ${key}" for key in properties]
query = f"{match_clause} SET {', '.join(set_clauses)}"
params = {"uid": uid, **properties}
self.run_query(query, params)
return
def count_tokens(self,text):
encoding = tiktoken.get_encoding("cl100k_base")
return len(encoding.encode(text))
def set_api_keys(self):
## Public openAI key
os.environ["OPENAI_API_KEY"]='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'
## Serper API key
os.environ["SERPER_API_KEY"] = "9a1f42c99feee69526e216af14e07b64fb4b3bfb"
## AzureOpenAI endpoint
os.environ["AZURE_OPENAI_ENDPOINT"] = "https://vice-llm-2.openai.azure.com/openai/deployments/OneCo-gpt/chat/completions?api-version=2024-08-01-preview"
## AzureOpenAI key
os.environ["AZURE_OPENAI_API_KEY"] = "8DaDtzYz3HePiypmFb6JQmJd3zUCtyCQkiYE8bePRnpyk2YNkJZRJQQJ99BAACfhMk5XJ3w3AAABACOGyJVB"
return
def extract_core_query(self,query_text):
"""
Extracts the core information-seeking question from a user query that may contain
both a question and processing instructions for the RAG system.
Args:
query_text: The original user query text
Returns:
dict: Contains the extracted information with keys:
- core_question: The actual information need/question
- instructions: Any processing instructions found
- is_complex: Boolean indicating if query contained instructions
"""
# Use the pre-model (smaller model) for this extraction task
llm = ChatOpenAI(
model=self.flow_control['pre_model'][1],
temperature=self.flow_control['pre_model'][2],
)
prompt = f"""
You are an AI query analyzer. Your task is to analyze the following user query and separate it into:
1. The core information-seeking question (what the user actually wants to know)
2. Any processing instructions (how the user wants information presented or processed)
User query: {query_text}
Output your analysis in strict JSON format:
```json
{{
"core_question": "The main question or information need",
"instructions": "Any processing instructions (or empty string if none)",
"is_complex": true/false (true if query contains instructions, false if it's just a question)
}}
```
Examples:
Input: "Tell me about mRNA vaccines and format the answer with bullet points"
Output:
```json
{{
"core_question": "Tell me about mRNA vaccines",
"instructions": "format the answer with bullet points",
"is_complex": true
}}
```
Input: "What are the main types of vaccine adjuvants?"
Output:
```json
{{
"core_question": "What are the main types of vaccine adjuvants?",
"instructions": "",
"is_complex": false
}}
```
Only respond with the JSON output, nothing else.
"""
response = llm.invoke(prompt)
try:
# Extract JSON from response if needed
content = response.content
if '```json' in content:
content = content.split('```json')[1].split('```')[0].strip()
elif '```' in content:
content = content.split('```')[1].split('```')[0].strip()
result = json.loads(content)
return result
except Exception as e:
# Fallback if parsing fails
print(f"Error parsing LLM response: {e}")
return {
"core_question": query_text,
"instructions": "",
"is_complex": False
}
def extract_serper_results(self, serper_response):
"""
Extract formatted search results and URLs from GoogleSerperAPI response.
Args:
serper_response: Raw response from GoogleSerperAPI (JSON object or string)
Returns:
tuple: (formatted_results, extracted_urls)
"""
search_results = ""
extracted_urls = []
try:
# Convert to dict if it's a string
if isinstance(serper_response, str):
try:
data = json.loads(serper_response)
except json.JSONDecodeError as e:
print(f"Error parsing Serper JSON: {e}")
return "Error processing search results.", []
else:
# It's already a dict/object
data = serper_response
# Add search query to the results
if 'searchParameters' in data and 'q' in data['searchParameters']:
search_query = data['searchParameters']['q']
search_results += f"### Search Results for: '{search_query}'\n\n"
else:
search_results += "### Search Results\n\n"
# Process organic search results
if 'organic' in data and isinstance(data['organic'], list):
for i, result in enumerate(data['organic']):
title = result.get('title', 'No title')
link = result.get('link', '')
snippet = result.get('snippet', '')
date = result.get('date', '')
# Format the result with block reference
block_num = self.block_counter + len(extracted_urls)
search_results += f"[block {block_num}] **{title}**\n"
if date:
search_results += f"*{date}*\n"
search_results += f"{snippet}\n"
search_results += f"URL: {link}\n\n"
# Add to extracted URLs
extracted_urls.append({
'title': title,
'url': link,
'snippet': snippet,
'date': date
})
# Process "People Also Ask" section
if 'peopleAlsoAsk' in data and isinstance(data['peopleAlsoAsk'], list):
search_results += "#### People Also Ask\n\n"
for i, qa in enumerate(data['peopleAlsoAsk']):
question = qa.get('question', '')
snippet = qa.get('snippet', '')
title = qa.get('title', '')
link = qa.get('link', '')
# Format the result with block reference
block_num = self.block_counter + len(extracted_urls)
search_results += f"[block {block_num}] **{question}**\n"
search_results += f"*{title}*\n"
search_results += f"{snippet}\n"
search_results += f"URL: {link}\n\n"
# Add to extracted URLs
extracted_urls.append({
'title': f"{question} - {title}",
'url': link,
'snippet': snippet
})
# If no results were found
if not extracted_urls:
search_results += "No search results were found.\n"
except Exception as e:
print(f"Error extracting Serper results: {e}")
search_results = "Error processing search results.\n"
return search_results, extracted_urls
def response_callback(self, query):
print("=" * 80)
print(f"š Starting response callback for query: '{query[:100]}{'...' if len(query) > 100 else ''}'")
print("=" * 80)
# Get detail level from flow control
detail_level = self.flow_control.get("detail_level", "Balanced")
print(f"š Detail level: {detail_level}")
# Reset small LLM usage tracking for this query
self.small_llm_usage = {
"keyword_extraction": 0,
"query_expansion": 0,
"document_summarization": 0,
"total_calls": 0
}
## We make a difference between the search enabled or disabled mode - the first will have 2 separate LLM calls.
## Common part - prepare the data
print("š Step 1: Extracting core query...")
query_analysis = self.extract_core_query(query)
print(f"ā
Core query extracted: {query_analysis}")
search_query = query_analysis["core_question"]
print(f"š Search query: {search_query}")
# Store the analysis for later use in processing
self.current_query_analysis = query_analysis
# Parse handler using the core question for retrieval
print("š Step 2: Gathering data from all sources...")
data_sections = self.parse_handler(search_query, detail_level)
print(f"ā
Data gathering complete. Found {len(data_sections)} data sections")
## prepare LLM following flow control
print("š¤ Step 3: Preparing main LLM...")
if self.flow_control['model'][0]=="OpenAi":
llm = ChatOpenAI(
model=self.flow_control['model'][1],
temperature=self.flow_control['model'][2],
timeout=None,
max_retries=2)
print(f"š¤ Main LLM: OpenAI {self.flow_control['model'][1]} (temp: {self.flow_control['model'][2]})")
elif self.flow_control['model'][0]=="Azure":
llm = AzureChatOpenAI(
azure_deployment=self.flow_control['model'][1],
api_version=self.flow_control['model'][3],
temperature=self.flow_control['model'][2],
max_tokens=2500,
timeout=None,
max_retries=2)
print(f"š¤ Main LLM: Azure {self.flow_control['model'][1]} (temp: {self.flow_control['model'][2]})")
else:
llm = ChatOpenAI(
model='gpt-4o',
temperature=0,
timeout=None,
max_retries=2)
print("š¤ Main LLM: OpenAI gpt-4o (default fallback)")
## Search enabled mode
self.search_results = ""
if self.flow_control["enable_search"]:
print("š Step 4: Web search enabled - generating search queries...")
## generate a first response to start the search
prompt=self.generate_prompt("Vaccine_google",data_sections,query)
print(f"š§ Using preprocessing LLM for search query generation...")
answer = llm.invoke(prompt)
print(f"ā
Search queries generated (response length: {len(answer.content)} chars)")
dict=json.loads(answer.content[8:-4])
search_tool = GoogleSerperAPIWrapper()
# Create a counter for web references
web_ref_count = self.block_counter
print(f"š Executing {len(dict['search_queries'])} web searches...")
for i, s in enumerate(dict['search_queries'], 1):
print(f"š Search {i}/{len(dict['search_queries'])}: {s}")
# Parse Serper results to extract content and URLs
search_output=search_tool.results(s)
#print("search output", search_output)
search_results, extracted_urls = self.extract_serper_results(search_output)
self.search_results = self.search_results + "\n" + search_results
print(f"ā
Search {i} complete: Found {len(extracted_urls)} URLs")
# Add extracted URLs to blocks_dict for reference
for url_info in extracted_urls:
title = url_info.get('title', 'Web Page')
url = url_info.get('url', '')
snippet = url_info.get('snippet', '')
# Add reference in blocks_dict
self.blocks_dict[self.block_counter] = {
"type": "web",
"id": f"web_{self.block_counter}",
"url": url,
"title": title,
"snippet": snippet,
"content": f"Web search result: {title}. {url}"
}
self.block_counter += 1
print(f"š Web search complete: Total {self.block_counter - web_ref_count} web references added")
else:
print("š Web search disabled - skipping")
print(f"š Total blocks in reference dictionary: {len(self.blocks_dict)}")
## This is the common part for both modes
print("š§ Step 5: Generating final response with main LLM...")
prompt=self.generate_prompt("Vaccine_base",data_sections,query)
# Count tokens in prompt for monitoring
prompt_tokens = self.count_tokens(prompt)
print(f"š Final prompt size: {prompt_tokens:,} tokens")
answer = llm.invoke(prompt)
response_tokens = self.count_tokens(answer.content)
print(f"ā
Main LLM response generated ({response_tokens:,} tokens)")
# If reference formatting is enabled, apply it
if self.flow_control["enable_referencing"]:
print("š Step 6: Processing references and formatting...")
print(f" ⢠Blocks available in blocks_dict: {list(self.blocks_dict.keys())}")
print(f" ⢠Total blocks: {len(self.blocks_dict)}")
# Debug: Show first 200 chars of answer to see what citations might be there
print(f" ⢠Answer preview: {answer.content[:300]}...")
# Check if this is extensive search mode (comprehensive blocks already contain references)
has_extensive_blocks = any(
block_data.get("type", "").startswith("comprehensive_extensive")
for block_data in self.blocks_dict.values()
)
if has_extensive_blocks:
print(" š Extensive search mode detected: comprehensive blocks already contain reference lists")
# Collect all references from comprehensive blocks and create a single consolidated reference section
all_references = []
total_ref_count = 0
for block_id, block_data in self.blocks_dict.items():
if block_data.get("type", "").startswith("comprehensive_extensive"):
# Get the stored reference section and extract individual references
ref_section = block_data.get("reference_section", "")
ref_count = block_data.get("reference_count", 0)
total_ref_count += ref_count
if ref_section:
# Extract the reference lines (skip the header and separator lines)
ref_lines = ref_section.split('\n')
for line in ref_lines:
line = line.strip()
# Skip empty lines, separator lines, and header lines
if line and not line.startswith('---') and not line.startswith('**References:**'):
all_references.append(line)
# Create a single consolidated reference section if we have references
if all_references:
# Remove duplicates while preserving order, considering only the content after the number
seen_content = set()
unique_references = []
ref_counter = 1
for ref in all_references:
# Extract the content after the reference number (e.g., "[1]: Document A" -> "Document A")
if ']:' in ref:
content_part = ref.split(']:')[1].strip()
else:
content_part = ref.strip()
# Check if we've seen this content before
if content_part not in seen_content:
seen_content.add(content_part)
# Renumber the reference to maintain sequential numbering
unique_references.append(f"[{ref_counter}]: {content_part}")
ref_counter += 1
# Clean up the main answer to remove any [Block X] style references
import re
cleaned_answer = re.sub(r'\[Block \d+\]', '', answer.content)
# Only clean up excessive spaces (not newlines) to preserve markdown formatting
cleaned_answer = re.sub(r'[ \t]+', ' ', cleaned_answer) # Replace multiple spaces/tabs with single space
cleaned_answer = re.sub(r'[ \t]*\n[ \t]*', '\n', cleaned_answer) # Clean up spaces around newlines
cleaned_answer = cleaned_answer.strip()
# Create the consolidated reference section
consolidated_references = "\n\n---\n\n**References:**\n" + "\n".join(unique_references)
formatted_answer = cleaned_answer + consolidated_references
print(f"ā
References consolidated from {len([bd for bd in self.blocks_dict.values() if bd.get('type', '').startswith('comprehensive_extensive')])} comprehensive blocks")
print(f" ⢠Total individual references collected: {len(all_references)}")
print(f" ⢠Unique references after content-based deduplication: {len(unique_references)}")
print(f" ⢠Expected total references: {total_ref_count}")
print(f" ⢠Duplicates removed: {len(all_references) - len(unique_references)}")
print(f" ⢠[Block X] references cleaned from main content")
else:
formatted_answer = answer.content
print(f"ā ļø No reference sections found in comprehensive blocks")
print(f" ⢠Reference consolidation complete - single reference section created")
else:
# Standard mode: process citations and create reference section
print(" š Standard mode: processing citations and creating reference section")
ref_manager = ReferenceManager(default_style="apa")
processed_text, references_section = ref_manager.process_references(
answer.content,
self.blocks_dict,
style="apa"
)
formatted_answer = processed_text + "\n\n" + references_section
print(f"ā
References processed and formatted")
print(f" ⢠References section length: {len(references_section)} characters")
else:
formatted_answer = answer.content
print("š Reference formatting disabled")
self.chat_memory.save_context(
{"role": "user", "content": query},
{"role": "assistant", "content": answer.content},
)
print("=" * 80)
print(f"šÆ Response complete! Final response: {len(formatted_answer):,} characters")
print(f"š¾ Total blocks in context: {len(self.blocks_dict)}")
print(f"\nš¤ SMALL LLM USAGE SUMMARY:")
print(f" ⢠Keyword extraction calls: {self.small_llm_usage['keyword_extraction']}")
print(f" ⢠Query expansion calls: {self.small_llm_usage['query_expansion']}")
print(f" ⢠Document summarization calls: {self.small_llm_usage['document_summarization']}")
print(f" ⢠Total small LLM calls: {self.small_llm_usage['total_calls']}")
print("=" * 80)
return pn.pane.Markdown(formatted_answer)
def get_embedding(self,text):
"""Generate an embedding for the given text using OpenAI's text-embedding-ada-002 model."""
response = openai.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
def extract_for_queries(self,text, queries, max_tokens=5000, api_key=None):
"""
Extract information from text based on queries.
Args:
text: Text to extract from
queries: List of queries to guide extraction
max_tokens: Maximum tokens in the output
api_key: API key for the LLM service
Returns:
Extracted text relevant to the queries
"""
api_key = "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" # Replace with your actual API key
extractor = QueryBasedExtractor(
max_output_tokens=max_tokens,
api_key=api_key,
model_name="gpt-4o-mini" # Or another small model
)
return extractor.extract(text, queries)
def parse_handler(self, query, detail_level="Balanced"):
data_sections = {}
# Create blocks_dict directly in the format needed by ReferenceManager
self.blocks_dict = {} # Replace self.inline_refs with self.blocks_dict
self.block_counter = 1 # Start block numbering from 1 to match example
for key in self.data_handles.handlers.keys():
if self.data_handles.handlers[key]["type"] == "text":
data_sections[key] = f"[block {self.block_counter}] {self.data_handles.handlers[key]['data']}"
# Create block entry in proper format
self.blocks_dict[self.block_counter] = {
"type": "generic",
"id": f"text_{self.block_counter}",
"content": f"Text content: {self.data_handles.handlers[key]['data'][:100]}..."
}
elif self.data_handles.handlers[key]["type"] == "dataframe":
data_sections[key] = f"[block {self.block_counter}] {self.extract_for_queries(self.data_handles.handlers[key]['data'].to_markdown(), [query])}"
# Create block entry for dataframe
self.blocks_dict[self.block_counter] = {
"type": "generic",
"id": f"dataframe_{self.block_counter}",
"content": f"Dataframe content from {key}"
}
elif self.data_handles.handlers[key]["type"] == "vectorstore":
data_sections[key] = self.collect_text_blocks(self.data_handles.handlers[key], query)
elif self.data_handles.handlers[key]["type"] == "db_search":
data_sections[key] = self.collect_data_from_neo4j(self.data_handles.handlers[key], query, detail_level=detail_level)
elif self.data_handles.handlers[key]["type"] == "chromaDB":
data_sections[key] = self.collect_data_from_chroma(self.data_handles.handlers[key], query, detail_level)
self.block_counter += 1
return data_sections
def reformat_data(self, data, min_document_length=30, similarity_threshold=0.95, use_crossencoder=False, inclusions=10):
"""
Reformat and filter data to be grouped by ID, excluding too-short documents
and documents that are too similar to each other. Optionally applies crossencoder ranking.
Args:
data: Original data structure
min_document_length: Minimum character length for documents to include (default: 30)
similarity_threshold: Threshold for document similarity (default: 0.95, higher means more similar)
use_crossencoder: Whether to apply crossencoder reranking (default: False)
inclusions: Number of documents to return after filtering (default: 10)
Returns:
List of selected documents (not dictionary)
"""
from sentence_transformers import CrossEncoder
import numpy as np
result = {}
selected_docs = []
selected_embeddings = []
# Unpack the nested lists for easier access
ids_list = data['ids'][0]
documents_list = data['documents'][0]
metadatas_list = data['metadatas'][0]
embeddings_array = data['embeddings'][0]
# First pass: filter by document length and organize data
candidates = []
for i, id_val in enumerate(ids_list):
# Check if document meets length requirement and does not exceed a max toaken lenght
if len(documents_list[i]) >= min_document_length and self.count_tokens(documents_list[i]) <= 10000:
candidates.append({
'id': id_val,
'document': documents_list[i],
'metadata': metadatas_list[i],
'embedding': embeddings_array[i].tolist() if embeddings_array is not None else None
})
# If we don't have enough candidates, return all we have
if len(candidates) <= inclusions:
return [(doc['metadata'],doc['document']) for doc in candidates]
# Second pass: filter by similarity
for candidate in candidates:
candidate_embedding = np.array(candidate['embedding'])
# Normalize embedding
norm = np.linalg.norm(candidate_embedding)
if norm > 0:
candidate_embedding = candidate_embedding / norm
# Check if candidate is too similar to any already selected document
is_redundant = False
for sel_emb in selected_embeddings:
similarity = np.dot(candidate_embedding, sel_emb)
if similarity >= similarity_threshold:
is_redundant = True
break
if not is_redundant:
# Add to result dictionary
result[candidate['id']] = {
'document': candidate['document'],
'metadata': candidate['metadata'],
'embedding': candidate['embedding']
}
# Add to selected lists for similarity checks
selected_docs.append(candidate)
selected_embeddings.append(candidate_embedding)
# If we've collected enough documents and don't need crossencoder, we can stop
#if len(selected_docs) >= inclusions * 2 and not use_crossencoder:
# break
# If using crossencoder for reranking
if use_crossencoder and len(selected_docs) > inclusions:
query = data.get('query_text', '')
if not query: # If no query provided, use a placeholder
query = "default query" # Ideally this should be passed in
cross_model = CrossEncoder('BAAI/bge-reranker-base')
query_doc_pairs = [(query, doc['document']) for doc in selected_docs]
scores = cross_model.predict(query_doc_pairs)
# Zip documents with their scores and sort by score (highest first)
doc_score_pairs = list(zip(selected_docs, scores))
ranked_docs = sorted(doc_score_pairs, key=lambda x: x[1], reverse=True)
# Take the top 'inclusions' documents after reranking
selected_docs = [doc for doc, _ in ranked_docs[:inclusions]]
elif len(selected_docs) > inclusions:
# If not using crossencoder but have too many docs, just take the first 'inclusions'
selected_docs = selected_docs[:inclusions]
# Return just the document text for further processing
#print("returning ",[(doc['metadata'],doc['document']) for doc in selected_docs])
return [(doc['metadata'],self.extract_for_queries(doc['document'],self.extended_query)) for doc in selected_docs]
def score_reference_relevance(self, final_answer, reference_documents, relevance_threshold=0.3):
"""
Score the relevance of each reference document against the final answer using a small LLM.
Args:
final_answer: The generated answer text
reference_documents: List of (metadata, content) tuples for reference documents
relevance_threshold: Minimum score to include a reference (0.0-1.0)
Returns:
List of (metadata, content, score) tuples for documents above threshold
"""
print(f"šÆ Scoring reference relevance against final answer...")
print(f" ⢠Evaluating {len(reference_documents)} reference documents")
print(f" ⢠Relevance threshold: {relevance_threshold}")
# Use small LLM for scoring
llm = ChatOpenAI(
model=self.flow_control['pre_model'][1],
temperature=0
)
relevant_references = []
# Create a short summary of the final answer for comparison (first 1000 chars)
answer_summary = final_answer[:1000] + "..." if len(final_answer) > 1000 else final_answer
for i, (metadata, content) in enumerate(reference_documents):
print(f" Scoring document {i+1}/{len(reference_documents)}: {metadata.get('title', 'Unknown')[:50]}...")
# Create a short excerpt from document content for scoring
content_excerpt = content[:800] + "..." if len(content) > 800 else content
prompt = f"""
You are a document relevance scorer. Your task is to determine how relevant a reference document is to a final answer.
FINAL ANSWER (excerpt):
{answer_summary}
REFERENCE DOCUMENT (excerpt):
{content_excerpt}
Rate the relevance of this reference document to the final answer on a scale of 0.0 to 1.0:
- 0.0: Completely irrelevant, no useful information for the answer
- 0.3: Somewhat relevant, provides background or tangential information
- 0.5: Moderately relevant, provides supporting information
- 0.7: Highly relevant, provides key information used in the answer
- 1.0: Extremely relevant, essential for the answer
Consider:
- Does the document contain information that directly supports the answer?
- Are there shared topics, concepts, or findings?
- Would removing this reference make the answer less accurate or complete?
Respond with ONLY a number between 0.0 and 1.0, nothing else.
"""
try:
response = llm.invoke(prompt)
score_text = response.content.strip()
# Extract number from response, handling various formats
import re
# Look for decimal numbers in the response
number_match = re.search(r'(\d+\.?\d*)', score_text)
if number_match:
score = float(number_match.group(1))
# Ensure score is between 0.0 and 1.0
score = max(0.0, min(1.0, score))
else:
print(f" ā ļø Could not parse score from: '{score_text}', using 0.5")
score = 0.5
if score >= relevance_threshold:
relevant_references.append((metadata, content, score))
print(f" ā
Score: {score:.2f} (included)")
else:
print(f" ā Score: {score:.2f} (filtered out)")
except Exception as e:
print(f" ā ļø Error scoring document: {e}")
print(f" ā ļø Response was: '{response.content if 'response' in locals() else 'No response'}'")
# Default to including if scoring fails
score = 0.5
relevant_references.append((metadata, content, score))
print(f" ā
Using default score: {score:.2f} (included)")
print(f" ā
Reference relevance scoring complete")
print(f" ⢠Documents evaluated: {len(reference_documents)}")
print(f" ⢠Documents above threshold: {len(relevant_references)}")
print(f" ⢠Documents filtered out: {len(reference_documents) - len(relevant_references)}")
# Sort by relevance score (highest first)
relevant_references.sort(key=lambda x: x[2], reverse=True)
return relevant_references
def extract_filter_keywords(self, query, n_keywords=2):
"""
Extract distinguishing keywords from a query for filtering search results.
Args:
query: The user's query text
n_keywords: Maximum number of keywords to extract
Returns:
List of keywords for filtering
"""
print(f"š Extracting filter keywords...")
print(f" š¤ Using small LLM ({self.flow_control['pre_model'][1]}) for keyword extraction")
self.small_llm_usage["keyword_extraction"] += 1
self.small_llm_usage["total_calls"] += 1
llm = ChatOpenAI(
model=self.flow_control['pre_model'][1],
temperature=0
)
# Make the instruction much more explicit about the exact count
prompt = f"""
You are a search optimization expert. Extract EXACTLY {n_keywords} specific distinguishing keyword(s) from this query:
"{query}"
Guidelines:
- You MUST return EXACTLY {n_keywords} keyword(s) - no more, no less
- Focus on proper nouns, company names, technical terms, and specific concepts
- Select word(s) that would differentiate this topic from related but irrelevant topics
- Choose word(s) that could filter out incorrect contexts (wrong companies, unrelated domains)
- Exclude common words like "the", "and", "of"
- Return ONLY a JSON array of strings with EXACTLY {n_keywords} string(s)
Example:
For "Impact of Pfizer's mRNA vaccine development on COVID-19 transmission" and n_keywords=1
Output: ["Pfizer"]
For "Impact of Pfizer's mRNA vaccine development on COVID-19 transmission" and n_keywords=3
Output: ["Pfizer", "mRNA", "COVID-19"]
Output format:
```json
["keyword1"{", keyword2" if n_keywords > 1 else ""}{", ..." if n_keywords > 2 else ""}]
```
Remember: I need EXACTLY {n_keywords} keyword(s). Count carefully before submitting.
"""
response = llm.invoke(prompt)
try:
# Extract JSON from response
content = response.content.strip()
if '```json' in content:
content = content.split('```json')[1].split('```')[0].strip()
elif '```' in content:
content = content.split('```')[1].split('```')[0].strip()
keywords = json.loads(content)
# Force the correct number of keywords
if len(keywords) > n_keywords:
keywords = keywords[:n_keywords]
elif len(keywords) < n_keywords and len(keywords) > 0:
# If we got fewer keywords than requested but at least one, duplicate the first one
while len(keywords) < n_keywords:
keywords.append(keywords[0])
keywords = [k.lower() for k in keywords] # Convert to lowercase for case-insensitive matching
print(f"ā
Filter keywords extracted: {keywords}")
return keywords
except Exception as e:
print(f"Error extracting keywords: {e}")
# Fall back to simple keyword extraction if LLM fails
words = query.lower().split()
stopwords = ['the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'with', 'about']
return [w for w in words if w not in stopwords and len(w) > 3][:n_keywords]
def collect_data_from_chroma(self, data, query, detail_level="Balanced"):
"""
Collect relevant documents from ChromaDB based on query with optimized workflow:
1) Combine results from all extended queries
2) Apply keyword filters across all results
3) Remove similar documents and apply cross-encoder if requested
4) Evaluate against target and add additional documents as needed
Args:
data: Configuration data for collection and processing
query: The user query
detail_level: Level of detail for the summary (Summary, Balanced, Detailed, Comprehensive)
Returns:
String with collected document blocks
"""
print("\n" + "=" * 60)
print(f"š CHROMADB COLLECTION: {data.get('data', 'Unknown')}")
print("=" * 60)
import re # For citation extraction in extensive search mode
# Start with empty collections
collected_blocks = []
candidate_docs = {
'ids': [],
'documents': [],
'metadatas': [],
'embeddings': []
}
# Extract filter keywords for hybrid search based on keyword filtering mode
filter_keywords = []
# Check if keyword filtering is enabled
keyword_filtering_enabled = self.flow_control.get("enable_keyword_filtering", False)
if keyword_filtering_enabled:
print("š Keyword filtering enabled - determining keyword source...")
# Check for manual keywords first
manual_keywords_str = self.flow_control.get("manual_keywords", "").strip()
if manual_keywords_str:
# Scenario 1: Manual keywords provided - use ONLY those
manual_keywords = [kw.strip() for kw in manual_keywords_str.split(',') if kw.strip()]
filter_keywords = manual_keywords
print(f"š Using manual keywords ONLY: {filter_keywords}")
else:
# Scenario 2: No manual keywords - use automatic LLM extraction
print("š¤ No manual keywords provided - using automatic LLM extraction...")
filter_keywords = self.extract_filter_keywords(query)
print(f"š Auto-extracted keywords: {filter_keywords}")
else:
print("āļø Keyword filtering disabled - skipping keyword extraction")
print(f"ā
Final filter keywords: {filter_keywords}")
# Configure retrieval parameters
use_crossencoder = "crossencoder" in data["processing_steps"]
target_docs = data["inclusions"]
initial_k = target_docs * 10 if use_crossencoder else target_docs * 3
print(f"š Retrieval Configuration:")
print(f" ⢠Target documents: {target_docs}")
print(f" ⢠Initial retrieval (k): {initial_k}")
print(f" ⢠Cross-encoder enabled: {use_crossencoder}")
print(f" ⢠Processing steps: {data['processing_steps']}")
# Generate extended queries if needed
if "extend_query" in data["processing_steps"]:
print("š Extending query with additional variations...")
self.extend_query(query)
self.extended_query.append(query)
print(f"ā
Query extension complete. Total queries: {len(self.extended_query)}")
else:
self.extended_query = [query]
print(f"š Using single query (no extension)")
# Get ChromaDB collection
# Extract collection name from data - handle both string and collection object cases
collection_name = data["data"]
if isinstance(collection_name, str):
# v0.6.0+: collection name is already a string
pass # collection_name is already correct
elif not isinstance(collection_name, str):
# If it's not a string, convert it to string and extract name if needed
collection_str = str(collection_name)
if 'Collection(name=' in collection_str:
# Extract name from string representation like "Collection(name=wuxi)"
match = re.search(r'Collection\(name=([^)]+)\)', collection_str)
if match:
collection_name = match.group(1)
else:
collection_name = collection_str
else:
collection_name = collection_str
print(f"ā
ChromaDB collection '{collection_name}' loaded successfully")
client = self.chroma_client.get_collection(collection_name, embedding_function=self.chroma_embedder)
# STEP 1: Retrieve candidate documents from all extended queries
print(f"\nš QUERY PROCESSING:")
print(f" ⢠Processing {len(self.extended_query)} extended queries")
print(f" ⢠Retrieving up to {initial_k} candidates per query")
all_ids = set() # Track IDs to avoid duplicates
for idx, q in enumerate(self.extended_query):
print(f" Query {idx+1}/{len(self.extended_query)}: '{q[:50]}{'...' if len(q) > 50 else ''}'")
# Retrieve a larger batch of documents
retrieved_docs = client.query(
query_texts=[q],
n_results=initial_k,
include=["documents", "metadatas", "embeddings"]
)
# Only process if we got results
query_count = 0
if retrieved_docs['documents'] and len(retrieved_docs['documents'][0]) > 0:
# Add unique documents to our candidates pool
for i, doc_id in enumerate(retrieved_docs['ids'][0]):
if doc_id not in all_ids:
all_ids.add(doc_id)
candidate_docs['ids'].append(doc_id)
candidate_docs['documents'].append(retrieved_docs['documents'][0][i])
candidate_docs['metadatas'].append(retrieved_docs['metadatas'][0][i])
if retrieved_docs['embeddings'] and len(retrieved_docs['embeddings'][0]) > i:
candidate_docs['embeddings'].append(retrieved_docs['embeddings'][0][i])
query_count += 1
print(f" ā Retrieved {len(retrieved_docs['documents'][0]) if retrieved_docs['documents'] else 0} total, {query_count} unique candidates")
print(f"\nš CANDIDATE DOCUMENT STATISTICS:")
print(f" ⢠Total unique candidates collected: {len(candidate_docs['ids'])}")
# STEP 2: Apply keyword filtering (skip for extensive search mode - will be applied later on full documents)
print(f"\nš KEYWORD FILTERING:")
keyword_filtering_enabled = self.flow_control.get("enable_keyword_filtering", False)
enable_extensive_search = self.flow_control.get("enable_extensive_search", False)
filtered_docs = []
# For extensive search mode, skip chunk-level keyword filtering
if enable_extensive_search:
print(f" š Extensive search mode: Deferring keyword filtering until full documents are retrieved")
# Use all candidates for extensive search - keyword filtering will be applied later
for i, doc in enumerate(candidate_docs['documents']):
embedding = candidate_docs['embeddings'][i] if i < len(candidate_docs['embeddings']) else None
filtered_docs.append({
'id': candidate_docs['ids'][i],
'document': doc,
'metadata': candidate_docs['metadatas'][i],
'embedding': embedding
})
print(f" ā
Using all {len(filtered_docs)} candidates for extensive search (keyword filtering deferred)")
elif keyword_filtering_enabled and filter_keywords and "keyword_filter" in data.get("processing_steps", []):
print(f" ⢠Applying chunk-level keyword filter with keywords: {filter_keywords}")
# First try documents containing ALL keywords
all_keywords_docs = []
for i, doc in enumerate(candidate_docs['documents']):
doc_lower = doc.lower()
if all(keyword.lower() in doc_lower for keyword in filter_keywords):
embedding = candidate_docs['embeddings'][i] if i < len(candidate_docs['embeddings']) else None
all_keywords_docs.append({
'id': candidate_docs['ids'][i],
'document': doc,
'metadata': candidate_docs['metadatas'][i],
'embedding': embedding
})
print(f" ⢠Documents with ALL keywords: {len(all_keywords_docs)}")
filtered_docs.extend(all_keywords_docs)
# If we don't have enough with all keywords, try documents with ANY keyword
if len(all_keywords_docs) < target_docs:
print(f" ⢠Looking for documents with ANY keyword (need {target_docs - len(all_keywords_docs)} more)")
any_keyword_docs = []
for i, doc in enumerate(candidate_docs['documents']):
doc_id = candidate_docs['ids'][i]
# Skip if already included
if any(d['id'] == doc_id for d in filtered_docs):
continue
doc_lower = doc.lower()
if any(keyword.lower() in doc_lower for keyword in filter_keywords):
embedding = candidate_docs['embeddings'][i] if i < len(candidate_docs['embeddings']) else None
any_keyword_docs.append({
'id': doc_id,
'document': doc,
'metadata': candidate_docs['metadatas'][i],
'embedding': embedding
})
print(f" ⢠Additional documents with ANY keyword: {len(any_keyword_docs)}")
filtered_docs.extend(any_keyword_docs)
print(f" ā
Total documents after chunk-level keyword filtering: {len(filtered_docs)}")
# Check if keyword filtering eliminated all documents
if len(filtered_docs) == 0:
print(f" ā KEYWORD FILTERING: No documents match keyword criteria '{', '.join(filter_keywords)}'")
print(f" š« Respecting filtering constraints - no content will be generated from this collection.")
print(f"\nšÆ CHROMADB COLLECTION SUMMARY:")
print(f" ⢠Collection: {collection_name}")
print(f" ⢠Mode: Keyword filtering applied (chunk-level)")
print(f" ⢠Documents processed: 0 (filtered out by keyword criteria)")
print(f" ⢠Blocks created: 0")
print(f" ⢠Keywords required: {filter_keywords}")
print("=" * 60)
return "" # Return empty string - no content to process
else:
if not keyword_filtering_enabled:
print(f" ā ļø Keyword filtering disabled in settings")
else:
print(f" ā ļø No keyword filtering applied (keywords: {filter_keywords}, processing_steps: {data.get('processing_steps', [])})")
# Without keyword filtering, use all candidates
for i, doc in enumerate(candidate_docs['documents']):
embedding = candidate_docs['embeddings'][i] if i < len(candidate_docs['embeddings']) else None
filtered_docs.append({
'id': candidate_docs['ids'][i],
'document': doc,
'metadata': candidate_docs['metadatas'][i],
'embedding': embedding
})
print(f" ā
Using all {len(filtered_docs)} candidates (no filtering applied)")
# STEP 3: Process using similarity threshold to remove near-duplicates
print(f"\nš BASIC FILTERING:")
min_doc_length = 30
print(f" ⢠Minimum document length: {min_doc_length} characters")
#max_token_length = 10000 # Avoid documents that are too long
# Apply basic filtering (length, tokens)
candidates = [doc for doc in filtered_docs
if len(doc['document']) >= min_doc_length ]
print(f" ā
Candidates after length filtering: {len(candidates)} (removed {len(filtered_docs) - len(candidates)})")
# Apply similarity filtering to remove near-duplicates
print(f"\nš SIMILARITY DEDUPLICATION:")
print(f" ⢠Similarity threshold: 0.95 (documents above this are considered duplicates)")
selected_docs = []
selected_embeddings = []
similarity_threshold = 0.95
embeddings_processed = 0
embeddings_skipped = 0
duplicates_removed = 0
for candidate in candidates:
# Skip documents without embeddings
if candidate['embedding'] is None or not isinstance(candidate['embedding'], (list, np.ndarray)):
embeddings_skipped += 1
continue
embeddings_processed += 1
candidate_embedding = np.array(candidate['embedding'])
# Normalize embedding
norm = np.linalg.norm(candidate_embedding)
if norm > 0:
candidate_embedding = candidate_embedding / norm
# Check if candidate is too similar to any already selected document
is_redundant = False
for sel_emb in selected_embeddings:
similarity = np.dot(candidate_embedding, sel_emb)
if similarity >= similarity_threshold:
is_redundant = True
duplicates_removed += 1
break
if not is_redundant:
selected_docs.append(candidate)
selected_embeddings.append(candidate_embedding)
print(f" ⢠Embeddings processed: {embeddings_processed}")
print(f" ⢠Embeddings skipped (missing): {embeddings_skipped}")
print(f" ⢠Duplicates removed: {duplicates_removed}")
print(f" ā
Unique documents after similarity filtering: {len(selected_docs)}")
# STEP 4: Apply cross-encoder reranking if requested
print(f"\nšÆ FINAL DOCUMENT SELECTION:")
final_docs = []
# Determine target based on extensive search mode
enable_extensive_search = self.flow_control.get("enable_extensive_search", False)
extensive_search_chunks = self.flow_control.get("extensive_search_chunks", 100)
if enable_extensive_search:
# For extensive search, we need more documents to reach the chunk target
effective_target = min(len(selected_docs), extensive_search_chunks)
print(f" š Extensive search mode: targeting {effective_target} documents for processing")
else:
effective_target = target_docs
if use_crossencoder and len(selected_docs) > effective_target:
print(f" š Applying cross-encoder reranking to {len(selected_docs)} documents")
from sentence_transformers import CrossEncoder
cross_model = CrossEncoder('BAAI/bge-reranker-base')
# Create query-document pairs for the reranker
query_doc_pairs = [(query, doc['document']) for doc in selected_docs]
scores = cross_model.predict(query_doc_pairs)
print(f" ⢠Cross-encoder scores computed")
print(f" ⢠Score range: {min(scores):.4f} to {max(scores):.4f}")
# Sort by score (highest first)
doc_score_pairs = list(zip(selected_docs, scores))
ranked_docs = sorted(doc_score_pairs, key=lambda x: x[1], reverse=True)
# Select top documents after reranking
final_docs = [doc for doc, _ in ranked_docs[:effective_target]]
print(f" ā
Selected top {len(final_docs)} documents after cross-encoder reranking")
else:
# If not using cross-encoder or don't have enough docs, use all selected docs
final_docs = selected_docs[:effective_target]
print(f" š Selected {len(final_docs)} documents by similarity ranking (no cross-encoder)")
# STEP 5: If we still don't have enough documents, try unfiltered search
if len(final_docs) < target_docs and len(final_docs) < len(candidates):
additional_needed = target_docs - len(final_docs)
print(f" š Adding {additional_needed} more documents to reach target of {target_docs}")
# Find documents not already selected
remaining_docs = [doc for doc in candidates if doc['id'] not in [d['id'] for d in final_docs]]
# Add up to the target number
final_docs.extend(remaining_docs[:target_docs - len(final_docs)])
print(f" ā
Final document count: {len(final_docs)}")
# STEP 6: Process final documents and create blocks
print(f"\nš DOCUMENT PROCESSING:")
print(f" ⢠Processing {len(final_docs)} selected documents")
# Check if any documents remain for processing
if len(final_docs) == 0:
print(f" ā No documents available for processing after all filtering steps")
print(f" š« Collection '{collection_name}' has no content that matches the criteria")
print(f"\nšÆ CHROMADB COLLECTION SUMMARY:")
print(f" ⢠Collection: {collection_name}")
print(f" ⢠Mode: All filtering applied")
print(f" ⢠Documents processed: 0 (filtered out by various criteria)")
print(f" ⢠Blocks created: 0")
if filter_keywords:
print(f" ⢠Keywords required: {filter_keywords}")
print("=" * 60)
return "" # Return empty string - no content to process
# Check if extensive search is enabled
enable_extensive_search = self.flow_control.get("enable_extensive_search", False)
extensive_search_chunks = self.flow_control.get("extensive_search_chunks", 100)
print(f" ⢠Extensive search enabled: {enable_extensive_search}")
if enable_extensive_search:
print(f" ⢠Extensive search chunk limit: {extensive_search_chunks}")
if enable_extensive_search and self.extensive_search_manager:
print(f" š EXTENSIVE SEARCH PROCESSING:")
print(f" ⢠Processing {len(final_docs)} documents with full document retrieval")
# If extensive search requested, retrieve more chunks
if len(final_docs) < extensive_search_chunks:
additional_needed = extensive_search_chunks - len(final_docs)
print(f" ⢠Retrieving {additional_needed} additional chunks for extensive search (target: {extensive_search_chunks})")
# Get more documents for extensive search - prioritize by:
# 1. Documents that have multiple chunks (for better document completion)
# 2. Documents that passed keyword filtering but were filtered out by similarity
# 3. Remaining candidates by similarity score
remaining_docs = [doc for doc in candidates if doc['id'] not in [d['id'] for d in final_docs]]
# Try to group by document to get complete documents
doc_groups = {}
for doc in remaining_docs:
bibtex = doc['metadata'].get('bibtex', '')
if bibtex:
# Use document path as grouping key
doc_key = bibtex.split('/')[-1] if '/' in bibtex else bibtex
if doc_key not in doc_groups:
doc_groups[doc_key] = []
doc_groups[doc_key].append(doc)
# Prioritize document groups that have multiple chunks
prioritized_docs = []
multi_chunk_docs = 0
# First add documents with multiple chunks (complete documents)
for doc_key, doc_list in sorted(doc_groups.items(), key=lambda x: len(x[1]), reverse=True):
if len(doc_list) > 1: # Multiple chunks from same document
print(f" ⢠Adding {len(doc_list)} chunks from document: {doc_key}")
prioritized_docs.extend(doc_list)
multi_chunk_docs += 1
if len(prioritized_docs) >= additional_needed:
break
# Then add remaining single chunks if needed
if len(prioritized_docs) < additional_needed:
single_chunks = []
for doc_key, doc_list in doc_groups.items():
if len(doc_list) == 1:
single_chunks.extend(doc_list)
# Sort single chunks by position in original candidates (which are sorted by similarity)
remaining_needed = additional_needed - len(prioritized_docs)
prioritized_docs.extend(single_chunks[:remaining_needed])
final_docs.extend(prioritized_docs[:additional_needed])
print(f" ⢠Extended to {len(final_docs)} documents for extensive processing")
print(f" ⢠Added {multi_chunk_docs} complete documents and {len(prioritized_docs) - multi_chunk_docs} individual chunks")
# Process documents with improved source tracking and batch processing
target_summary_tokens = self.flow_control.get("target_summary_tokens", 8000)
print(f" ⢠Processing documents with batch consolidation...")
print(f" ⢠Target final summary size: ~{target_summary_tokens:,} tokens (suitable for main LLM)")
# Prepare documents for processing
documents_for_processing = []
full_docs_retrieved = 0
for idx, doc in enumerate(final_docs):
print(f" Processing document {idx+1}/{len(final_docs)}: {doc['id'][:20]}...")
# Try to get full document
full_document = self.extensive_search_manager.get_full_document(
doc['metadata'],
collection_name
)
current_content = ""
if full_document:
full_docs_retrieved += 1
current_content = full_document
print(f" ā
Full document retrieved: {self.count_tokens(full_document):,} tokens")
else:
# Fallback to original chunk content
current_content = doc['document']
print(f" ā ļø Using original chunk: {self.count_tokens(current_content):,} tokens")
# Prepare document metadata for source tracking
metadata = {
'id': doc['id'],
'source': collection_name,
'title': doc.get('metadata', {}).get('title', 'Unknown Document'),
'parent_name': doc.get('metadata', {}).get('parent_name', 'Unknown Source'),
'bibtex': doc.get('metadata', {}).get('bibtex', '')
}
# Improve title extraction - use filename if title is generic
if metadata['title'] == 'Unknown Document':
# Try to extract a better title from parent_name or bibtex
if metadata['parent_name'] and metadata['parent_name'] != 'Unknown Source':
# Use filename as title
filename = metadata['parent_name'].split('/')[-1] if '/' in metadata['parent_name'] else metadata['parent_name']
metadata['title'] = filename
elif metadata['bibtex']:
# Use filename from bibtex path
filename = metadata['bibtex'].split('/')[-1] if '/' in metadata['bibtex'] else metadata['bibtex']
metadata['title'] = filename
documents_for_processing.append((metadata, current_content))
print(f" ⢠Documents prepared: {len(documents_for_processing)}")
print(f" ⢠Full documents retrieved: {full_docs_retrieved}")
# APPLY KEYWORD FILTERING ON FULL DOCUMENTS FOR EXTENSIVE SEARCH MODE
keyword_filtered_documents = documents_for_processing
keyword_filtering_enabled = self.flow_control.get("enable_keyword_filtering", False)
if keyword_filtering_enabled and filter_keywords and "keyword_filter" in data.get("processing_steps", []):
print(f"\n š APPLYING KEYWORD FILTERING TO FULL DOCUMENTS:")
print(f" ⢠Filter keywords: {filter_keywords}")
print(f" ⢠Evaluating {len(documents_for_processing)} full documents")
# Apply keyword filtering to full document content
all_keywords_docs = []
any_keywords_docs = []
for metadata, full_content in documents_for_processing:
full_content_lower = full_content.lower()
doc_title = metadata.get('title', 'Unknown')[:50]
# Create searchable text that includes content, title, filename, and other metadata
searchable_text = full_content_lower
# Add title to searchable text
if metadata.get('title') and metadata.get('title') != 'Unknown Document':
searchable_text += " " + metadata.get('title', '').lower()
# Add parent_name (often contains filename/path info)
if metadata.get('parent_name') and metadata.get('parent_name') != 'Unknown Source':
searchable_text += " " + metadata.get('parent_name', '').lower()
# Add bibtex field (often contains filename)
if metadata.get('bibtex'):
searchable_text += " " + metadata.get('bibtex', '').lower()
# Add source information
if metadata.get('source'):
searchable_text += " " + metadata.get('source', '').lower()
# Check if document contains ALL keywords (in content or metadata)
if all(keyword.lower() in searchable_text for keyword in filter_keywords):
all_keywords_docs.append((metadata, full_content))
print(f" ā
ALL keywords found in: {doc_title}")
# Check if document contains ANY keyword (in content or metadata)
elif any(keyword.lower() in searchable_text for keyword in filter_keywords):
any_keywords_docs.append((metadata, full_content))
print(f" ā” SOME keywords found in: {doc_title}")
else:
print(f" ā NO keywords found in: {doc_title}")
# Prioritize documents with ALL keywords, then add ANY keywords if needed
keyword_filtered_documents = all_keywords_docs.copy()
# Calculate how many more documents we need
target_after_filtering = min(len(documents_for_processing), target_summary_tokens // 200) # Rough estimate
if len(keyword_filtered_documents) < target_after_filtering and any_keywords_docs:
additional_needed = target_after_filtering - len(keyword_filtered_documents)
keyword_filtered_documents.extend(any_keywords_docs[:additional_needed])
print(f" ⢠Added {min(additional_needed, len(any_keywords_docs))} documents with partial keyword matches")
print(f" š KEYWORD FILTERING RESULTS:")
print(f" ⢠Documents with ALL keywords: {len(all_keywords_docs)}")
print(f" ⢠Documents with ANY keywords: {len(any_keywords_docs)}")
print(f" ⢠Documents with NO keywords: {len(documents_for_processing) - len(all_keywords_docs) - len(any_keywords_docs)}")
print(f" ⢠Final filtered documents: {len(keyword_filtered_documents)}")
print(f" ⢠Documents removed by filtering: {len(documents_for_processing) - len(keyword_filtered_documents)}")
if len(keyword_filtered_documents) == 0:
print(f" ā KEYWORD FILTERING: No documents match keyword criteria '{', '.join(filter_keywords)}'")
print(f" š« Respecting filtering constraints - no content will be generated from this collection.")
# Don't fall back to original documents - respect the filtering
# keyword_filtered_documents remains empty
else:
if keyword_filtering_enabled:
print(f"\n ā ļø Keyword filtering enabled but conditions not met:")
print(f" ⢠Keywords available: {bool(filter_keywords)}")
print(f" ⢠Processing steps include keyword_filter: {'keyword_filter' in data.get('processing_steps', [])}")
else:
print(f"\n ā ļø Keyword filtering disabled for extensive search")
# Use keyword-filtered documents for further processing
documents_for_processing = keyword_filtered_documents
# Check if any documents remain after keyword filtering
if len(documents_for_processing) == 0:
print(f" ā No documents available for processing after keyword filtering")
print(f" š« Skipping collection '{collection_name}' - no matching content found")
print(f"\nšÆ CHROMADB COLLECTION SUMMARY:")
print(f" ⢠Collection: {collection_name}")
print(f" ⢠Mode: Keyword filtering applied")
print(f" ⢠Documents processed: 0 (filtered out by keyword criteria)")
print(f" ⢠Blocks created: 0")
print(f" ⢠Keywords required: {filter_keywords}")
print("=" * 60)
return "" # Return empty string - no content to process
# Process documents with NO citation processing in extensive search mode
comprehensive_summary, source_mapping, individual_summaries = self.extensive_search_manager.process_documents_with_source_tracking(
documents_for_processing,
query,
target_summary_tokens=target_summary_tokens,
batch_size=10, # Process 10 documents at a time with large LLM
use_inline_citations=False, # Extensive search mode: no inline citations
disable_citations=True, # Completely disable citation logic for extensive search
detail_level=detail_level
)
print(f" ⢠Comprehensive summary generated (no citation processing)")
print(f" ⢠Summary tokens: {self.count_tokens(comprehensive_summary):,}")
# APPLY REFERENCE RELEVANCE FILTERING IF ENABLED
filtered_documents_for_processing = documents_for_processing
if self.flow_control.get("enable_reference_filtering", True):
relevance_threshold = self.flow_control.get("relevance_threshold", 0.3)
print(f" šÆ Applying reference relevance filtering...")
# Score documents against the comprehensive summary
relevant_docs_with_scores = self.score_reference_relevance(
comprehensive_summary,
documents_for_processing,
relevance_threshold
)
# Extract just the metadata and content (remove scores for compatibility)
filtered_documents_for_processing = [(metadata, content) for metadata, content, score in relevant_docs_with_scores]
print(f" ⢠References before filtering: {len(documents_for_processing)}")
print(f" ⢠References after filtering: {len(filtered_documents_for_processing)}")
print(f" ⢠References removed: {len(documents_for_processing) - len(filtered_documents_for_processing)}")
else:
print(f" ā ļø Reference relevance filtering disabled")
# BUILD PROPERLY FORMATTED REFERENCE LIST FROM FILTERED DOCUMENTS
print(f" ⢠Building reference list from {len(filtered_documents_for_processing)} filtered documents...")
# Use ReferenceManager for consistent formatting
ref_manager = ReferenceManager(default_style="apa")
reference_list = []
# Create properly formatted reference entries for FILTERED documents
for i, (metadata, _) in enumerate(filtered_documents_for_processing):
ref_num = i + 1
title = metadata.get('title', f'Document {ref_num}')
source_info = metadata.get('bibtex', metadata.get('parent_name', 'Unknown Source'))
# Format based on source type using ReferenceManager methods
if metadata.get('bibtex', '') and '@' in metadata.get('bibtex', ''):
# Literature reference - use BibTeX formatting
try:
citation_marker, reference = ref_manager.format_bibtex_reference(
metadata['bibtex'], str(ref_num), "apa"
)
reference_list.append(reference)
except Exception as e:
print(f" ā ļø Error formatting BibTeX for ref {ref_num}: {e}")
reference_list.append(f"[{ref_num}]: {title} (Literature)")
elif any(ext in source_info.lower() for ext in ['.pdf', '.docx', '.pptx', '.xlsx', '.txt']):
# Document reference - use file formatting with clickable links
try:
doc_path = source_info
# Don't use title as description if it's the same as filename
filename = doc_path.split('/')[-1] if '/' in doc_path else doc_path
description = title if title != filename and title != f'Document {ref_num}' and title != 'Unknown Document' else None
citation_marker, reference = ref_manager.process_file_reference(
doc_path, str(ref_num), description
)
reference_list.append(reference)
except Exception as e:
print(f" ā ļø Error formatting file reference for ref {ref_num}: {e}")
doc_name = source_info.split('/')[-1] if '/' in source_info else source_info
reference_list.append(f"[{ref_num}]: <a href='file:///{source_info}' target='_blank'>{doc_name}</a>")
else:
# Generic reference - basic format with collection info
reference_list.append(f"[{ref_num}]: {title} (Collection: {collection_name})")
print(f" ⢠Created properly formatted reference list with {len(reference_list)} entries")
# Create the final block with summary + simple reference list
reference_section = "\n\n---\n\n**References:**\n" + "\n".join(reference_list)
final_content = comprehensive_summary + reference_section
# Add as single comprehensive block
collected_blocks.append(f"[block {self.block_counter}] {final_content}")
# Store metadata for this comprehensive block
self.blocks_dict[self.block_counter] = {
"type": "comprehensive_extensive",
"id": f"extensive_{self.block_counter}",
"content": f"Comprehensive summary from {len(documents_for_processing)} documents in ChromaDB collection '{collection_name}' (extensive search mode - filtered reference list)",
"documents_processed": len(documents_for_processing),
"documents_in_references": len(filtered_documents_for_processing),
"full_docs_retrieved": full_docs_retrieved,
"source_batches": len(source_mapping),
"reference_count": len(reference_list),
"reference_section": reference_section, # Store the reference section separately
"comprehensive_summary": comprehensive_summary # Store the summary separately
}
print(f" ā
Created single comprehensive block {self.block_counter}")
print(f" ⢠Content length: {len(final_content):,} characters")
print(f" ⢠References listed: {len(reference_list)}")
self.block_counter += 1
else:
# Standard processing without extensive search
print(f" š STANDARD PROCESSING:")
print(f" ⢠Processing {len(final_docs)} documents without extensive search")
for i, doc in enumerate(final_docs):
print(f" Processing document {i+1}/{len(final_docs)}: {doc['metadata'].get('bibtex', doc['id'])[:60]}...")
# Extract the most relevant content using query-based extractor
extracted_content = self.extract_for_queries(doc['document'], self.extended_query)
# Add reference in formatted text
collected_blocks.append(f"[block {self.block_counter}] {extracted_content}")
# Create proper blocks_dict entry
filepath = doc['metadata'].get('bibtex', doc['metadata'].get('path', ''))
if filepath and filepath.lower().endswith(('.pptx', '.docx', '.xlsx', '.pdf', '.csv', '.txt')):
self.blocks_dict[self.block_counter] = {
"type": "document",
"id": f"doc_{self.block_counter}",
"path": filepath,
"description": f"Document from ChromaDB collection '{collection_name}'"
}
elif '@article' in filepath:
self.blocks_dict[self.block_counter] = {
"type": "literature",
"id": f"doc_{self.block_counter}",
"bibtex": filepath,
"description": f"Document from ChromaDB collection '{collection_name}'"
}
else:
self.blocks_dict[self.block_counter] = {
"type": "generic",
"id": f"ref_{self.block_counter}",
"content": f"ChromaDB: {collection_name} - {filepath}"
}
self.block_counter += 1
print(f" ā
Created {len(final_docs)} standard blocks")
print(f"\nšÆ CHROMADB COLLECTION SUMMARY:")
print(f" ⢠Collection: {collection_name}")
if enable_extensive_search:
print(f" ⢠Mode: Extensive search (single comprehensive block + simple reference list, no citations)")
print(f" ⢠Documents processed: {len(final_docs)}")
print(f" ⢠Blocks created: 1 comprehensive block")
else:
print(f" ⢠Mode: Standard processing (individual blocks with inline citations)")
print(f" ⢠Documents processed: {len(final_docs)}")
print(f" ⢠Blocks created: {len(final_docs)} individual blocks")
print(f" ⢠Total characters: {sum(len(block) for block in collected_blocks):,}")
print("=" * 60)
return "\n".join(collected_blocks)
def get_embedding(self, text):
"""Generate an embedding for the given text using OpenAI's text-embedding model."""
# Use direct client instead of module-level API to avoid ambiguity errors
from openai import OpenAI
try:
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
response = client.embeddings.create(
model="text-embedding-3-small",
input=text
)
embedding = response.data[0].embedding
return embedding
except Exception as e:
print(f"Error generating embedding: {str(e)}")
# Return an empty embedding in case of error
return [0.0] * 1536 # Typical dimension for text-embedding-3-small
def collect_data_from_neo4j(self, data, query, doc_type="literature data", detail_level="Balanced"):
"""
Collect relevant documents from Neo4j using keyword pre-filtering and FAISS vector search
Args:
data: Dictionary containing search configuration
query: User's query text
doc_type: Type of documents to search (default: "literature data")
detail_level: Level of detail for the summary (Summary, Balanced, Detailed, Comprehensive)
Returns:
String with formatted text blocks for LLM context
"""
collected_blocks = []
# Handle query extension if needed
if "extend_query" in data["processing_steps"]:
self.extend_query(query)
self.extended_query.append(query)
else:
self.extended_query = [query]
partial_inclusions=max(2,data['inclusions']//len(self.extended_query)+1)
# Set up embeddings
embedding_function = OpenAIEmbeddings(model="text-embedding-3-small")
# Set up a collector for documents processed
docs_added = 0
#target_docs = data["inclusions"]
self.litt_nodes_cache = None
self.litt_VS_cache = None
# Process each query (original and any extensions)
for q in self.extended_query:
print(f"Working on query: {q}")
# First, get embeddings for the query
query_embedding = self.get_embedding(q)
# Process Text_chunks
text_chunks_processed = self._process_node_type(
data['data'], # Base Cypher filter
query_embedding,
q,
"Text_chunk",
"Text",
partial_inclusions,
embedding_function,
doc_type
)
if text_chunks_processed:
collected_blocks.extend(text_chunks_processed["blocks"])
docs_added += len(text_chunks_processed["blocks"])
print(f"Added {len(text_chunks_processed.get('blocks', []))} text chunks")
# If we still need more documents, try Table_chunks
# if docs_added < target_docs:
# table_chunks_processed = self._process_node_type(
# data['data'], # Base Cypher filter
# query_embedding,
# q,
# "Table_chunk",
# "Html", # Use Html for tables
# initial_k,
# embedding_function,
# doc_type,
# target_docs - docs_added
# )
# if table_chunks_processed:
# collected_blocks.extend(table_chunks_processed["blocks"])
# docs_added += len(table_chunks_processed["blocks"])
# print(f"Added {len(table_chunks_processed.get('blocks', []))} table chunks")
# # If we have enough documents, stop processing queries
# if docs_added >= target_docs:
# break
print(f"Total blocks added: {len(collected_blocks)}")
return "\n".join(collected_blocks)
def collect_data_from_neo4j_new(self, data, query, doc_type="literature data", detail_level="Balanced"):
"""
Collect relevant documents from Neo4j using optimized workflow:
1) Combine results from all extended queries
2) Apply keyword filters across all results
3) Remove similar documents and apply cross-encoder if requested
4) Evaluate against target and add additional documents as needed
Args:
data: Dictionary containing search configuration
query: User's query text
doc_type: Type of documents to search (default: "literature data")
detail_level: Level of detail for the summary (Summary, Balanced, Detailed, Comprehensive)
Returns:
String with formatted text blocks for LLM context
"""
print("\n" + "=" * 60)
print(f"š NEO4J LITERATURE SEARCH: {doc_type}")
print("=" * 60)
# Clear cache for new query
self.litt_nodes_cache = None
self.litt_VS_cache = None
collected_blocks = []
# Configure retrieval parameters
use_crossencoder = "crossencoder" in data["processing_steps"]
target_docs = data["inclusions"]
initial_k = target_docs * 10 if use_crossencoder else target_docs * 3
print(f"š Retrieval Configuration:")
print(f" ⢠Target documents: {target_docs}")
print(f" ⢠Initial retrieval (k): {initial_k}")
print(f" ⢠Cross-encoder enabled: {use_crossencoder}")
print(f" ⢠Processing steps: {data['processing_steps']}")
# Handle query extension if needed
if "extend_query" in data["processing_steps"]:
print("š Extending query with additional variations...")
self.extend_query(query)
self.extended_query.append(query)
print(f"ā
Query extension complete. Total queries: {len(self.extended_query)}")
else:
self.extended_query = [query]
print(f"š Using single query (no extension)")
# Set up embeddings for vector search
embedding_function = OpenAIEmbeddings(model="text-embedding-3-small")
# STEP 1: Get query embedding for vector search
query_embedding = self.get_embedding(query)
# STEP 2: Retrieve nodes from Neo4j for all queries together
# This retrieval is based solely on cypher queries from all extended queries
all_nodes = []
all_node_ids = set() # Track retrieved node IDs to avoid duplicates
print(f"Retrieving Neo4j nodes")
# Extract keywords for this query to use in Neo4j filtering
filter_keywords = []
if "keyword_filter" in data.get("processing_steps", []):
keyword_filtering_enabled = self.flow_control.get("enable_keyword_filtering", False)
if keyword_filtering_enabled:
# Check for manual keywords first
manual_keywords_str = self.flow_control.get("manual_keywords", "").strip()
if manual_keywords_str:
# Scenario 1: Manual keywords provided - use ONLY those
manual_keywords = [kw.strip() for kw in manual_keywords_str.split(',') if kw.strip()]
filter_keywords = manual_keywords
print(f"Using manual keywords for Neo4j: {filter_keywords}")
else:
# Scenario 2: No manual keywords - use automatic LLM extraction
filter_keywords = self.extract_filter_keywords(q, n_keywords=3)
print(f"Using auto-extracted keywords for Neo4j: {filter_keywords}")
else:
print("Keyword filtering disabled - no keyword filtering for Neo4j")
else:
print("Keyword filter not in processing steps - no keyword filtering for Neo4j")
# Process base query based on type
if isinstance(data['data'], list):
# Multiple query variants
for query_variant in data['data']:
nodes = self._fetch_neo4j_nodes(
query_variant,
filter_keywords,
"Text_chunk",
"Text",
initial_k
)
# Add unique nodes to collection
for node in nodes:
if node["uid"] not in all_node_ids:
all_node_ids.add(node["uid"])
all_nodes.append(node)
else:
# Single query string
nodes = self._fetch_neo4j_nodes(
data['data'],
filter_keywords,
"Text_chunk",
"Text",
initial_k
)
# Add unique nodes to collection
for node in nodes:
if node["uid"] not in all_node_ids:
all_node_ids.add(node["uid"])
all_nodes.append(node)
print(f"Retrieved {len(all_nodes)} unique nodes from Neo4j")
# Cache all retrieved nodes
self.litt_nodes_cache = all_nodes
for q in self.extended_query:
# STEP 3: Filter nodes by basic criteria (length, token count)
min_doc_length = 30
max_token_length = 15000 # Avoid very long documents
filtered_nodes = [
node for node in all_nodes
if node["content"] and len(node["content"]) >= min_doc_length ]
print(f"Have {len(filtered_nodes)} nodes after basic filtering")
# If no filtered nodes, return empty result
if not filtered_nodes:
return ""
# STEP 4: Apply vector search with similarity threshold to remove near-duplicates
selected_nodes = []
selected_embeddings = []
similarity_threshold = 0.95
# First generate embeddings for all filtered nodes
node_embeddings = []
for node in filtered_nodes:
try:
content = node["content"]
if self.count_tokens(content) > 8192:
# Summarize very long content
from openai import OpenAI
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
summarized = self.extract_for_queries(content, self.extended_query[:1])
embedding = client.embeddings.create(
model="text-embedding-3-small",
input=summarized
).data[0].embedding
else:
# Use original content for shorter texts
embedding = self.get_embedding(content)
node_embeddings.append((node, embedding))
except Exception as e:
print(f"Error generating embedding for node {node['uid']}: {str(e)}")
# Skip this node
continue
# Apply similarity filtering
query_embedding_array = np.array(query_embedding)
for node, embedding in node_embeddings:
embedding_array = np.array(embedding)
# Normalize embeddings
norm = np.linalg.norm(embedding_array)
if norm > 0:
embedding_array = embedding_array / norm
# Check if too similar to any already selected node
is_redundant = False
for sel_emb in selected_embeddings:
similarity = np.dot(embedding_array, sel_emb)
if similarity >= similarity_threshold:
is_redundant = True
break
if not is_redundant:
selected_nodes.append(node)
selected_embeddings.append(embedding_array)
print(f"Selected {len(selected_nodes)} nodes after similarity filtering")
# STEP 5: Apply cross-encoder reranking if requested
final_nodes = []
if use_crossencoder and len(selected_nodes) > target_docs:
print("Applying cross-encoder reranking")
from sentence_transformers import CrossEncoder
cross_model = CrossEncoder('BAAI/bge-reranker-base')
# Create query-document pairs for the reranker
query_doc_pairs = [(query, node["content"]) for node in selected_nodes]
scores = cross_model.predict(query_doc_pairs)
# Sort by score (highest first)
node_score_pairs = list(zip(selected_nodes, scores))
ranked_nodes = sorted(node_score_pairs, key=lambda x: x[1], reverse=True)
# Select top nodes after reranking
final_nodes = [node for node, _ in ranked_nodes[:target_docs]]
else:
# If not using cross-encoder or don't have enough docs, use all selected docs
final_nodes = selected_nodes[:target_docs]
# STEP 6: Process final nodes into blocks
print(f"Processing final {len(final_nodes)} nodes into blocks")
# Check if extensive search is enabled
enable_extensive_search = self.flow_control.get("enable_extensive_search", False)
extensive_search_chunks = self.flow_control.get("extensive_search_chunks", 100)
if enable_extensive_search and self.extensive_search_manager:
print(f"š Extensive search enabled for Neo4j - processing {len(final_nodes)} nodes with full document retrieval")
# If extensive search requested, retrieve more chunks
if len(final_nodes) < extensive_search_chunks:
print(f"Retrieving additional chunks for extensive search (target: {extensive_search_chunks})")
# Get more nodes for extensive search
additional_needed = extensive_search_chunks - len(final_nodes)
remaining_nodes = [node for node in selected_nodes if node["uid"] not in [n["uid"] for n in final_nodes]]
final_nodes.extend(remaining_nodes[:additional_needed])
# Process nodes with full document retrieval and summarization
documents_with_full_context = []
for node in final_nodes:
uid = node["uid"]
# Try to get full document from Neo4j
full_document = self.extensive_search_manager.get_full_document_neo4j(uid)
if full_document:
# Summarize the full document with query context
summarized_doc = self.extensive_search_manager.summarize_document(
full_document,
query,
max_tokens=1500
)
# Create metadata-like dict for compatibility
metadata = {
'title': node.get("parent_name", "Unknown Document"),
'parent_name': node["parent_name"],
'bibtex': node["bibtex"],
'uid': uid,
'node_type': node["node_type"]
}
documents_with_full_context.append((metadata, summarized_doc))
else:
# Fallback to original chunk content
metadata = {
'title': node.get("parent_name", "Unknown Document"),
'parent_name': node["parent_name"],
'bibtex': node["bibtex"],
'uid': uid,
'node_type': node["node_type"]
}
documents_with_full_context.append((metadata, node["content"]))
# Remove duplicate documents based on content similarity
unique_documents = self.extensive_search_manager.remove_duplicate_documents(
documents_with_full_context,
similarity_threshold=0.85
)
print(f"After deduplication: {len(unique_documents)} unique documents")
# APPLY KEYWORD FILTERING ON FULL DOCUMENTS FOR EXTENSIVE SEARCH MODE (NEO4J)
keyword_filtered_unique_documents = unique_documents
keyword_filtering_enabled = self.flow_control.get("enable_keyword_filtering", False)
# Extract filter keywords for Neo4j (same logic as ChromaDB section)
filter_keywords = []
if keyword_filtering_enabled:
# Check for manual keywords first
manual_keywords_str = self.flow_control.get("manual_keywords", "").strip()
if manual_keywords_str:
# Scenario 1: Manual keywords provided - use ONLY those
manual_keywords = [kw.strip() for kw in manual_keywords_str.split(',') if kw.strip()]
filter_keywords = manual_keywords
else:
# Scenario 2: No manual keywords - use automatic LLM extraction
filter_keywords = self.extract_filter_keywords(query)
if keyword_filtering_enabled and filter_keywords and "keyword_filter" in data.get("processing_steps", []):
print(f"\n š APPLYING KEYWORD FILTERING TO FULL DOCUMENTS (NEO4J):")
print(f" ⢠Filter keywords: {filter_keywords}")
print(f" ⢠Evaluating {len(unique_documents)} full documents")
# Apply keyword filtering to full document content
all_keywords_docs = []
any_keywords_docs = []
for metadata, full_content in unique_documents:
full_content_lower = full_content.lower()
doc_title = metadata.get('title', metadata.get('parent_name', 'Unknown'))[:50]
# Create searchable text that includes content, title, filename, and other metadata
searchable_text = full_content_lower
# Add title to searchable text
if metadata.get('title') and metadata.get('title') != 'Unknown Document':
searchable_text += " " + metadata.get('title', '').lower()
# Add parent_name (often contains filename/path info)
if metadata.get('parent_name') and metadata.get('parent_name') != 'Unknown Source':
searchable_text += " " + metadata.get('parent_name', '').lower()
# Add bibtex field (often contains filename)
if metadata.get('bibtex'):
searchable_text += " " + metadata.get('bibtex', '').lower()
# Add source information
if metadata.get('source'):
searchable_text += " " + metadata.get('source', '').lower()
# Check if document contains ALL keywords (in content or metadata)
if all(keyword.lower() in searchable_text for keyword in filter_keywords):
all_keywords_docs.append((metadata, full_content))
print(f" ā
ALL keywords found in: {doc_title}")
# Check if document contains ANY keyword (in content or metadata)
elif any(keyword.lower() in searchable_text for keyword in filter_keywords):
any_keywords_docs.append((metadata, full_content))
print(f" ā” SOME keywords found in: {doc_title}")
else:
print(f" ā NO keywords found in: {doc_title}")
# Prioritize documents with ALL keywords, then add ANY keywords if needed
keyword_filtered_unique_documents = all_keywords_docs.copy()
# Calculate how many more documents we need
target_after_filtering = min(len(unique_documents), target_summary_tokens // 200) # Rough estimate
if len(keyword_filtered_unique_documents) < target_after_filtering and any_keywords_docs:
additional_needed = target_after_filtering - len(keyword_filtered_unique_documents)
keyword_filtered_unique_documents.extend(any_keywords_docs[:additional_needed])
print(f" ⢠Added {min(additional_needed, len(any_keywords_docs))} documents with partial keyword matches")
print(f" š KEYWORD FILTERING RESULTS (NEO4J):")
print(f" ⢠Documents with ALL keywords: {len(all_keywords_docs)}")
print(f" ⢠Documents with ANY keywords: {len(any_keywords_docs)}")
print(f" ⢠Documents with NO keywords: {len(unique_documents) - len(all_keywords_docs) - len(any_keywords_docs)}")
print(f" ⢠Final filtered documents: {len(keyword_filtered_unique_documents)}")
print(f" ⢠Documents removed by filtering: {len(unique_documents) - len(keyword_filtered_unique_documents)}")
if len(keyword_filtered_unique_documents) == 0:
print(f" ā KEYWORD FILTERING (NEO4J): No documents match keyword criteria '{', '.join(filter_keywords)}'")
print(f" š« Respecting filtering constraints - no content will be generated from Neo4j.")
# Don't fall back to original documents - respect the filtering
# keyword_filtered_unique_documents remains empty
else:
if keyword_filtering_enabled:
print(f"\n ā ļø Keyword filtering enabled but conditions not met for Neo4j:")
print(f" ⢠Keywords available: {bool(filter_keywords)}")
print(f" ⢠Processing steps include keyword_filter: {'keyword_filter' in data.get('processing_steps', [])}")
else:
print(f"\n ā ļø Keyword filtering disabled for Neo4j extensive search")
# Use keyword-filtered documents for further processing
unique_documents = keyword_filtered_unique_documents
# Check if any documents remain after keyword filtering
if len(unique_documents) == 0:
print(f" ā No documents available for processing after keyword filtering (Neo4j)")
print(f" š« Skipping Neo4j collection - no matching content found")
print(f"\nšÆ NEO4J COLLECTION SUMMARY:")
print(f" ⢠Collection: Neo4j literature data")
print(f" ⢠Mode: Keyword filtering applied")
print(f" ⢠Documents processed: 0 (filtered out by keyword criteria)")
print(f" ⢠Blocks created: 0")
print(f" ⢠Keywords required: {filter_keywords}")
print("=" * 60)
return "" # Return empty string - no content to process
# Process with extensive search manager (no citation processing in extensive search mode)
target_summary_tokens = self.flow_control.get("target_summary_tokens", 8000)
comprehensive_summary, source_mapping, individual_summaries = self.extensive_search_manager.process_documents_with_source_tracking(
unique_documents,
query,
target_summary_tokens=target_summary_tokens,
batch_size=10,
use_inline_citations=False, # Extensive search mode: no inline citations
disable_citations=True, # Completely disable citation logic for extensive search
detail_level=detail_level
)
print(f" ⢠Comprehensive summary generated (no citation processing)")
print(f" ⢠Summary tokens: {self.count_tokens(comprehensive_summary):,}")
# APPLY REFERENCE RELEVANCE FILTERING IF ENABLED
filtered_unique_documents = unique_documents
if self.flow_control.get("enable_reference_filtering", True):
relevance_threshold = self.flow_control.get("relevance_threshold", 0.3)
print(f" šÆ Applying reference relevance filtering...")
# Score documents against the comprehensive summary
relevant_docs_with_scores = self.score_reference_relevance(
comprehensive_summary,
unique_documents,
relevance_threshold
)
# Extract just the metadata and content (remove scores for compatibility)
filtered_unique_documents = [(metadata, content) for metadata, content, score in relevant_docs_with_scores]
print(f" ⢠References before filtering: {len(unique_documents)}")
print(f" ⢠References after filtering: {len(filtered_unique_documents)}")
print(f" ⢠References removed: {len(unique_documents) - len(filtered_unique_documents)}")
else:
print(f" ā ļø Reference relevance filtering disabled")
# BUILD PROPERLY FORMATTED REFERENCE LIST FROM FILTERED DOCUMENTS (no citation extraction)
print(f" ⢠Building reference list from {len(filtered_unique_documents)} filtered documents...")
# Use ReferenceManager for consistent formatting
ref_manager = ReferenceManager(default_style="apa")
reference_list = []
# Create properly formatted reference entries for FILTERED documents
for i, (metadata, _) in enumerate(filtered_unique_documents):
ref_num = i + 1
title = metadata.get('title', metadata.get('parent_name', f'Document {ref_num}'))
bibtex = metadata.get('bibtex', '')
parent_name = metadata.get('parent_name', '')
# Improve title extraction - use filename if title is generic or same as parent_name
if title == f'Document {ref_num}' or title == parent_name:
if parent_name:
# Use filename as title
filename = parent_name.split('/')[-1] if '/' in parent_name else parent_name
title = filename
# Format based on source type using ReferenceManager methods
if bibtex and '@' in bibtex:
# Literature reference - use BibTeX formatting
try:
citation_marker, reference = ref_manager.format_bibtex_reference(
bibtex, str(ref_num), "apa"
)
reference_list.append(reference)
except Exception as e:
print(f" ā ļø Error formatting BibTeX for ref {ref_num}: {e}")
reference_list.append(f"[{ref_num}]: {title} (Literature)")
elif parent_name and any(ext in parent_name.lower() for ext in ['.pdf', '.docx', '.pptx', '.xlsx', '.txt']):
# Document reference - use file formatting with clickable links
try:
# Don't use title as description if it's the same as filename
filename = parent_name.split('/')[-1] if '/' in parent_name else parent_name
description = title if title != filename and title != f'Document {ref_num}' else None
citation_marker, reference = ref_manager.process_file_reference(
parent_name, str(ref_num), description
)
reference_list.append(reference)
except Exception as e:
print(f" ā ļø Error formatting file reference for ref {ref_num}: {e}")
reference_list.append(f"[{ref_num}]: <a href='file:///{parent_name}' target='_blank'>{parent_name}</a>")
else:
# Neo4j document reference - basic format
reference_list.append(f"[{ref_num}]: {title} (Neo4j document)")
print(f" ⢠Created properly formatted reference list with {len(reference_list)} entries")
# Create the final block with summary + simple reference list
reference_section = "\n\n---\n\n**References:**\n" + "\n".join(reference_list)
final_content = comprehensive_summary + reference_section
# Add as single comprehensive block
collected_blocks.append(f"[block {self.block_counter}] {final_content}")
# Store metadata for this comprehensive block
self.blocks_dict[self.block_counter] = {
"type": "comprehensive_extensive_neo4j",
"id": f"extensive_neo4j_{self.block_counter}",
"content": f"Comprehensive summary from {len(unique_documents)} documents in Neo4j (extensive search mode - filtered reference list)",
"documents_processed": len(unique_documents),
"documents_in_references": len(filtered_unique_documents),
"source_batches": len(source_mapping),
"reference_count": len(reference_list),
"reference_section": reference_section, # Store the reference section separately
"comprehensive_summary": comprehensive_summary # Store the summary separately
}
print(f"ā
Created single comprehensive Neo4j block {self.block_counter}")
print(f"⢠Content length: {len(final_content):,} characters")
print(f"⢠References listed: {len(reference_list)}")
self.block_counter += 1
else:
# Standard processing without extensive search
for node in final_nodes:
content = node["content"]
uid = node["uid"]
parent_name = node["parent_name"]
bibtex = node["bibtex"]
node_type = node["node_type"]
# Extract the most relevant content using query-based extractor
extracted_content = self.extract_for_queries(content, self.extended_query)
# Add reference in formatted text
block_text = f"[block {self.block_counter}] {extracted_content}"
collected_blocks.append(block_text)
# Create reference entry
if bibtex and '@' in bibtex:
# Literature reference with BibTeX
self.blocks_dict[self.block_counter] = {
"type": "literature",
"id": f"lit_{self.block_counter}",
"bibtex": bibtex,
"content": f"Neo4j literature: {parent_name}"
}
else:
# Generic document reference
self.blocks_dict[self.block_counter] = {
"type": "generic",
"id": f"doc_{self.block_counter}",
"content": f"Document: {parent_name}"
}
# Increment block counter
self.block_counter += 1
print(f"\nšÆ NEO4J LITERATURE SEARCH SUMMARY:")
if enable_extensive_search:
print(f" ⢠Mode: Extensive search (single comprehensive block + reference list)")
print(f" ⢠Documents processed: {len(final_nodes)}")
print(f" ⢠Blocks created: 1 comprehensive block")
else:
print(f" ⢠Mode: Standard processing (individual blocks with inline citations)")
print(f" ⢠Documents processed: {len(final_nodes)}")
print(f" ⢠Blocks created: {len(final_nodes)} individual blocks")
print(f" ⢠Total characters: {sum(len(block) for block in collected_blocks):,}")
print("=" * 60)
return "\n".join(collected_blocks)
def _fetch_neo4j_nodes(self, base_query, keywords, node_type, content_field, limit):
"""
Helper method to fetch nodes from Neo4j with keyword filtering
Args:
base_query: Base cypher query for filtering
keywords: Keywords for filtering results
node_type: Type of node to fetch
content_field: Field containing the content
limit: Maximum number of nodes to retrieve
Returns:
List of node dictionaries
"""
# Construct keyword clause for filtering
keyword_clauses = []
for keyword in keywords:
# Escape single quotes in keywords
safe_keyword = keyword.replace("'", "\\'")
keyword_clauses.append(f"x.{content_field} CONTAINS '{safe_keyword}'")
# Combine keyword clauses with OR
if keyword_clauses==[]:
keyword_filter = None
else:
keyword_filter = " OR ".join(keyword_clauses)
# Construct the final query with keyword filtering
if keyword_filter:
if "WHERE" in base_query or "where" in base_query:
# Add to existing WHERE clause
query_with_keywords = base_query.replace("where","WHERE").replace("WHERE", f"WHERE ({keyword_filter}) AND ")
else:
# Add new WHERE clause
query_with_keywords = f"{base_query} WHERE {keyword_filter}"
else:
# No keywords, use original query
query_with_keywords = base_query
# Complete the query to fetch publications and other metadata
cypher_query = f"""
{query_with_keywords}
MATCH (p:Publication)-->(x)
RETURN x.UID AS uid,
x.{content_field} AS content,
p.Name AS parent_name,
p.BibTex AS bibtex
LIMIT {limit}
"""
# Execute query and collect results
results = self.session.run(cypher_query)
nodes = []
for record in results:
nodes.append({
"uid": record["uid"],
"content": record["content"],
"parent_name": record["parent_name"],
"bibtex": record["bibtex"],
"node_type": node_type
})
return nodes
def _process_node_type(self, base_query, query_embedding, query_text, node_type, content_field, k, embedding_function, doc_type):
"""
Helper method to process a specific node type with FAISS vector search
Args:
base_query: Base cypher query for pre-filtering
query_embedding: Embedding vector for the query
query_text: Text of the query for cross-encoder ranking
node_type: Type of node to process (Text_chunk or Table_chunk)
content_field: Field containing the node content
k: Number of results to retrieve
embedding_function: Function to generate embeddings
doc_type: Type of document
max_results: Maximum number of results to return
Returns:
Dictionary with blocks added and other metadata
"""
import numpy as np
import faiss
from sentence_transformers import CrossEncoder
processing_steps = self.data_handles.handlers.get(doc_type, {}).get("processing_steps", [])
if "crossencoder" in processing_steps:
k_initial=k*10
else:
k_initial=k
# Step 1: Fetch pre-filtered nodes from Neo4j without vector search
# Instead of using the raw base_query, let's parse it properly
if not self.litt_nodes_cache:
if isinstance(base_query, list):
# If base_query is a list of query strings, run them separately
all_nodes = []
for query_variant in base_query:
# Use MATCH pattern instead of directly inserting raw query
cypher_query = f"""
{query_variant}
WITH x, count(distinct k) as keyword_count
ORDER BY keyword_count DESC
LIMIT {k_initial * 10}
MATCH (p:Publication)-->(x)
RETURN x.UID AS uid,
x.{content_field} AS content,
p.Name AS parent_name,
p.BibTex AS bibtex
"""
results = self.session.run(cypher_query)
for record in results:
all_nodes.append({
"uid": record["uid"],
"content": record["content"],
"parent_name": record["parent_name"],
"bibtex": record["bibtex"],
"node_type": node_type
})
nodes = all_nodes
else:
# For string base_query, use it as a filter
cypher_query = f"""
{base_query}
MATCH (p:Publication)-->(x)
RETURN x.UID AS uid,
x.{content_field} AS content,
p.Name AS parent_name,
p.BibTex AS bibtex
LIMIT {k_initial * 10}
"""
results = self.session.run(cypher_query)
nodes = []
for record in results:
nodes.append({
"uid": record["uid"],
"content": record["content"],
"parent_name": record["parent_name"],
"bibtex": record["bibtex"],
"node_type": node_type
})
self.litt_nodes_cache = nodes
else:
nodes = self.litt_nodes_cache
# Rest of the method remains the same...
contents = [node["content"] for node in nodes if node["content"] and len(node["content"]) >= 30]
metadata = [{
"uid": node["uid"],
"parent_name": node["parent_name"],
"bibtex": node["bibtex"],
"node_type": node["node_type"]
} for node in nodes if node["content"] and len(node["content"]) >= 30]
# If we didn't find any nodes, return empty result
if not contents:
return {"blocks": [], "count": 0}
# Continue with the rest of the method...
if len(contents) > 0:
try:
if not(self.litt_VS_cache):
# Use a direct OpenAI client without module-level API to avoid ambiguity
from openai import OpenAI
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
# Generate embeddings for all contents
content_embeddings = []
for content in contents:
try:
if self.count_tokens(content) > 8192:
# Summarize for very long content
content = self.summarize_text(content, os.environ["OPENAI_API_KEY"])
# Use direct client instead of module-level functions
response = client.embeddings.create(
model="text-embedding-3-small",
input=content
)
embedding = response.data[0].embedding
content_embeddings.append(embedding)
except Exception as e:
print(f"Error generating embedding: {str(e)}")
content_embeddings.append([0.0] * len(query_embedding)) # Empty embedding
# Create FAISS index
dimension = len(query_embedding)
index = faiss.IndexFlatL2(dimension)
# Add embeddings to index
if content_embeddings:
index.add(np.array(content_embeddings, dtype=np.float32))
self.litt_VS_cache = index
else:
index=self.litt_VS_cache
try:
# Search for similar vectors
D, I = index.search(np.array([query_embedding], dtype=np.float32), k_initial)
# Get the most similar nodes
similar_indices = I[0]
similar_nodes = [nodes[idx] for idx in similar_indices if idx < len(nodes)]
# Step 3: Apply cross-encoder reranking if needed
processing_steps = self.data_handles.handlers.get(doc_type, {}).get("processing_steps", [])
if "crossencoder" in processing_steps:
print("Applying cross-encoder reranking")
cross_model = CrossEncoder('BAAI/bge-reranker-base')
# Prepare document pairs for reranking
query_chunk_pairs = [(query_text, node["content"]) for node in similar_nodes]
scores = cross_model.predict(query_chunk_pairs)
# Combine nodes with their scores
node_score_pairs = list(zip(similar_nodes, scores))
# Sort by score (highest first)
ranked_nodes = sorted(node_score_pairs, key=lambda x: x[1], reverse=True)
# Take top nodes after reranking
top_nodes = [node for node, _ in ranked_nodes[:k]]
else:
# Just limit the number if no reranking
top_nodes = similar_nodes[:k]
# Step 4: Format the results
blocks = []
for i, node in enumerate(top_nodes):
content = node["content"]
uid = node["uid"]
parent_name = node["parent_name"]
bibtex = node["bibtex"]
node_type = node["node_type"]
# Format the content block
content = self.extract_for_queries(content, self.extended_query)
block_text = f"[block {self.block_counter}] {content}"
blocks.append(block_text)
# Create reference entry
if bibtex and '@' in bibtex:
# Literature reference with BibTeX
self.blocks_dict[self.block_counter] = {
"type": "literature",
"id": f"lit_{self.block_counter}",
"bibtex": bibtex,
"content": f"Neo4j literature: {parent_name}"
}
else:
# Generic document reference
self.blocks_dict[self.block_counter] = {
"type": "generic",
"id": f"doc_{self.block_counter}",
"content": f"Document: {parent_name}"
}
# Increment block counter
self.block_counter += 1
return {"blocks": blocks, "count": len(blocks)}
except Exception as e:
print(f"Error processing block results: {str(e)}")
return {"blocks": [], "count": 0}
except Exception as e:
print(f"Error in FAISS processing: {str(e)}")
return {"blocks": [], "count": 0}
def collect_text_blocks(self, data,query):
embedding_function = OpenAIEmbeddings()
if "crossencoder" in data["processing_steps"]:
initial_k=data["inclusions"]*10
else:
initial_k=data["inclusions"]
if "extend_query" in data["processing_steps"]:
self.extend_query(query)
self.extended_query.append(query)
else:
self.extended_query=[query]
## First step is alway a similarity search
collected_blocks = []
retriever = data['data'].as_retriever(
search_type="similarity",
search_kwargs={"k": initial_k*3}
)
for q in self.extended_query:
print("working on query ",q)
retrieved_docs = retriever.invoke(q)
retrieved_texts = [doc.page_content for doc in retrieved_docs if len(doc.page_content)>30]
# Here we recompute embeddings for each candidate document.
candidate_embeddings = np.array([self.normalize(embedding_function.embed_query(doc))
for doc in retrieved_texts])
# Compute and normalize the query embedding
query_embedding = self.normalize(np.array(embedding_function.embed_query(q)))
# 4. Run MMR to select a diverse subset of documents
print("running MMR")
#retrieved_texts = self.mmr(query_embedding, candidate_embeddings, retrieved_texts, lambda_param=0.5, top_k=initial_k)
retrieved_texts=self.similarity_threshold_filter(query_embedding, candidate_embeddings, retrieved_texts, similarity_threshold=0.95,top_k=initial_k)
## If crossencoder is used, we need to rerank the results
if "crossencoder" in data["processing_steps"]:
cross_model = CrossEncoder('BAAI/bge-reranker-base')
query_chunk_pairs = [(q, chunk) for chunk in retrieved_texts]
scores = cross_model.predict(query_chunk_pairs)
chunk_score_pairs = list(zip(retrieved_texts, scores))
ranked_chunks = sorted(chunk_score_pairs, key=lambda x: x[1], reverse=True)
retrieved_texts = [chunk for chunk, score in ranked_chunks[:data["inclusions"]//2]]
#print("blocks from ",q," \n","\n".join(retrieved_texts))
for block in retrieved_texts:
collected_blocks.append("[block "+str(self.block_counter)+"] "+block)
self.inline_refs['block '+str(self.block_counter)]='VStore Block '+str(self.block_counter)
self.block_counter+=1
return "\n".join(collected_blocks)
def generate_prompt(self,template,data_sections,query):
prompt_template=my_prompt_templates.get(template,'')
if prompt_template=="":
prompt_template=my_prompt_templates.get("Vaccine_base",'')
prompt=prompt_template["Instructions"]+"\n"
# Add detailed instructions if provided
detailed_instructions = self.flow_control.get("detailed_instructions", "").strip()
if detailed_instructions:
prompt += "\n[Additional Detailed Instructions]:\n" + detailed_instructions + "\n"
i=0
for i, key in enumerate(data_sections.keys()):
prompt=prompt+"Step "+str(i+1)+" on section labeled [" +key+"]: "+self.data_handles.handlers[key]['instructions']+ "\n"
if self.flow_control["enable_search"]:
prompt=prompt+"Step "+str(i+2)+" on section labeled [web search results] : Provide a summary of the given context data extracted from the web, using summary tables when possible.\n"
if self.flow_control["enable_memory"]:
prompt=prompt+"Step "+str(i+3)+" on section labeled [previous chats] : Also take into account your previous answers.\n"
prompt=prompt+prompt_template["Output Constraints"]+"\n\n"
i=0
for i, key in enumerate(data_sections.keys()):
prompt=prompt+"Data section "+str(i+1)+"- [" +key+"]\n"+data_sections[key]+ "\n"
if self.flow_control["enable_search"]:
prompt=prompt+"Data section "+str(i+2)+"- [web search results] \n"+self.search_results+ "\n"
if self.flow_control["enable_memory"]:
prompt=prompt+"Data section "+str(i+3)+"- [previous chats] \n"+self.chat_memory.get_formatted_history()+ "\n"
prompt=prompt+"User query: "+query
return prompt
def extend_query(self,query):
print(f"š Extending query with additional variations...")
print(f" š¤ Using small LLM (gpt-4o-mini) for query expansion")
self.small_llm_usage["query_expansion"] += 1
self.small_llm_usage["total_calls"] += 1
llm = ChatOpenAI(model_name="gpt-4o-mini", temperature=0)
prompt = f"""
You are an AI that enhances a given user search query to improve information retrieval.
### User Query:
{query}
### Instructions:
- Provide exactly 5 expanded queries.
- Each query should explore a different aspect or perspective of the original query.
- Use synonyms, related terms, or rephrased versions to cover various dimensions of the topic.
- Ensure that the expanded queries are relevant and coherent with the original query.
- Avoid generating queries that are too similar to each other.
- ONLY return the text of the expanded queries.
### Expanded Queries:
"""
answer = llm.invoke(prompt)
self.extended_query=[x for x in answer.content.strip().split("\n") if x != ""]
print(f"ā
Query extension complete. Total queries: {len(self.extended_query) + 1}") # +1 for original
return
def normalize(self,vector):
return vector / np.linalg.norm(vector)
def mmr(self,query_embedding, candidate_embeddings, candidate_docs, lambda_param=0.7, top_k=5):
# Compute similarity between the query and each candidate (dot product assumes normalized vectors)
candidate_similarities = np.dot(candidate_embeddings, query_embedding)
# Initialize selected and remaining indices
selected_indices = []
candidate_indices = list(range(len(candidate_docs)))
# First selection: candidate with highest similarity
first_idx = int(np.argmax(candidate_similarities))
selected_indices.append(first_idx)
candidate_indices.remove(first_idx)
# Iteratively select documents that balance relevance and diversity
while len(selected_indices) < top_k and candidate_indices:
best_score = -np.inf
best_idx = None
for idx in candidate_indices:
# Relevance score for candidate idx
relevance = candidate_similarities[idx]
# Diversity score: maximum similarity with any already selected document
diversity = max(np.dot(candidate_embeddings[idx], candidate_embeddings[sel_idx])
for sel_idx in selected_indices)
# Combined MMR score
score = lambda_param * relevance - (1 - lambda_param) * diversity
if score > best_score:
best_score = score
best_idx = idx
selected_indices.append(best_idx)
candidate_indices.remove(best_idx)
return [candidate_docs[i] for i in selected_indices]
def similarity_threshold_filter(self, query_embedding, candidate_embeddings, candidate_docs, similarity_threshold=0.9,top_k=5):
selected_docs = []
selected_embeddings = []
# Compute query similarity scores for sorting candidates (highest first)
candidate_scores = np.dot(candidate_embeddings, query_embedding)
sorted_indices = np.argsort(candidate_scores)[::-1]
for idx in sorted_indices:
candidate_embedding = candidate_embeddings[idx]
# Check if candidate is too similar to any already selected document
is_redundant = any(np.dot(candidate_embedding, sel_emb) >= similarity_threshold
for sel_emb in selected_embeddings)
if not is_redundant and len(selected_docs) < top_k:
print("appending ",candidate_docs[idx])
selected_docs.append(candidate_docs[idx])
selected_embeddings.append(candidate_embedding)
return selected_docs
def consolidate_with_large_llm_and_citations(self, *args, **kwargs):
"""
Wrapper method that delegates to the ExtensiveSearchManager instance.
This provides backward compatibility and easy access to the dual approach functionality.
"""
if self.extensive_search_manager:
return self.extensive_search_manager.consolidate_with_large_llm_and_citations(*args, **kwargs)
else:
raise RuntimeError("ExtensiveSearchManager not initialized. Cannot use consolidate_with_large_llm_and_citations.")
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self)
Purpose: Internal method: init
Returns: None
get_instruction_template(self, template_name)
Purpose: Get instruction template by name
Parameters:
template_name: Parameter
Returns: None
save_instruction_template(self, template_name, instructions)
Purpose: Save a custom instruction template
Parameters:
template_name: Parameterinstructions: Parameter
Returns: None
load_custom_templates(self)
Purpose: Load custom instruction templates from file
Returns: None
init_connections(self)
Purpose: Performs init connections
Returns: None
run_query(self, query, params)
Purpose: Execute a Cypher query and return the result Parameters ---------- query : str The Cypher query to execute params : dict, optional Parameters for the query Returns ------- result The query result
Parameters:
query: Parameterparams: Parameter
Returns: See docstring for return details
evaluate_query(self, query, params)
Purpose: Execute a Cypher query and return a single result Parameters ---------- query : str The Cypher query to execute params : dict, optional Parameters for the query Returns ------- object The single result value
Parameters:
query: Parameterparams: Parameter
Returns: See docstring for return details
push_changes(self, node)
Purpose: Push changes to a node to the database Parameters ---------- node : dict or node-like object Node with properties to update
Parameters:
node: Parameter
Returns: None
count_tokens(self, text)
Purpose: Performs count tokens
Parameters:
text: Parameter
Returns: None
set_api_keys(self)
Purpose: Sets api keys
Returns: None
extract_core_query(self, query_text)
Purpose: Extracts the core information-seeking question from a user query that may contain both a question and processing instructions for the RAG system. Args: query_text: The original user query text Returns: dict: Contains the extracted information with keys: - core_question: The actual information need/question - instructions: Any processing instructions found - is_complex: Boolean indicating if query contained instructions
Parameters:
query_text: Parameter
Returns: See docstring for return details
extract_serper_results(self, serper_response)
Purpose: Extract formatted search results and URLs from GoogleSerperAPI response. Args: serper_response: Raw response from GoogleSerperAPI (JSON object or string) Returns: tuple: (formatted_results, extracted_urls)
Parameters:
serper_response: Parameter
Returns: See docstring for return details
response_callback(self, query)
Purpose: Performs response callback
Parameters:
query: Parameter
Returns: None
get_embedding(self, text)
Purpose: Generate an embedding for the given text using OpenAI's text-embedding-ada-002 model.
Parameters:
text: Parameter
Returns: None
extract_for_queries(self, text, queries, max_tokens, api_key)
Purpose: Extract information from text based on queries. Args: text: Text to extract from queries: List of queries to guide extraction max_tokens: Maximum tokens in the output api_key: API key for the LLM service Returns: Extracted text relevant to the queries
Parameters:
text: Parameterqueries: Parametermax_tokens: Parameterapi_key: Parameter
Returns: See docstring for return details
parse_handler(self, query, detail_level)
Purpose: Performs parse handler
Parameters:
query: Parameterdetail_level: Parameter
Returns: None
reformat_data(self, data, min_document_length, similarity_threshold, use_crossencoder, inclusions)
Purpose: Reformat and filter data to be grouped by ID, excluding too-short documents and documents that are too similar to each other. Optionally applies crossencoder ranking. Args: data: Original data structure min_document_length: Minimum character length for documents to include (default: 30) similarity_threshold: Threshold for document similarity (default: 0.95, higher means more similar) use_crossencoder: Whether to apply crossencoder reranking (default: False) inclusions: Number of documents to return after filtering (default: 10) Returns: List of selected documents (not dictionary)
Parameters:
data: Parametermin_document_length: Parametersimilarity_threshold: Parameteruse_crossencoder: Parameterinclusions: Parameter
Returns: See docstring for return details
score_reference_relevance(self, final_answer, reference_documents, relevance_threshold)
Purpose: Score the relevance of each reference document against the final answer using a small LLM. Args: final_answer: The generated answer text reference_documents: List of (metadata, content) tuples for reference documents relevance_threshold: Minimum score to include a reference (0.0-1.0) Returns: List of (metadata, content, score) tuples for documents above threshold
Parameters:
final_answer: Parameterreference_documents: Parameterrelevance_threshold: Parameter
Returns: See docstring for return details
extract_filter_keywords(self, query, n_keywords)
Purpose: Extract distinguishing keywords from a query for filtering search results. Args: query: The user's query text n_keywords: Maximum number of keywords to extract Returns: List of keywords for filtering
Parameters:
query: Parametern_keywords: Parameter
Returns: See docstring for return details
collect_data_from_chroma(self, data, query, detail_level)
Purpose: Collect relevant documents from ChromaDB based on query with optimized workflow: 1) Combine results from all extended queries 2) Apply keyword filters across all results 3) Remove similar documents and apply cross-encoder if requested 4) Evaluate against target and add additional documents as needed Args: data: Configuration data for collection and processing query: The user query detail_level: Level of detail for the summary (Summary, Balanced, Detailed, Comprehensive) Returns: String with collected document blocks
Parameters:
data: Parameterquery: Parameterdetail_level: Parameter
Returns: See docstring for return details
get_embedding(self, text)
Purpose: Generate an embedding for the given text using OpenAI's text-embedding model.
Parameters:
text: Parameter
Returns: None
collect_data_from_neo4j(self, data, query, doc_type, detail_level)
Purpose: Collect relevant documents from Neo4j using keyword pre-filtering and FAISS vector search Args: data: Dictionary containing search configuration query: User's query text doc_type: Type of documents to search (default: "literature data") detail_level: Level of detail for the summary (Summary, Balanced, Detailed, Comprehensive) Returns: String with formatted text blocks for LLM context
Parameters:
data: Parameterquery: Parameterdoc_type: Parameterdetail_level: Parameter
Returns: See docstring for return details
collect_data_from_neo4j_new(self, data, query, doc_type, detail_level)
Purpose: Collect relevant documents from Neo4j using optimized workflow: 1) Combine results from all extended queries 2) Apply keyword filters across all results 3) Remove similar documents and apply cross-encoder if requested 4) Evaluate against target and add additional documents as needed Args: data: Dictionary containing search configuration query: User's query text doc_type: Type of documents to search (default: "literature data") detail_level: Level of detail for the summary (Summary, Balanced, Detailed, Comprehensive) Returns: String with formatted text blocks for LLM context
Parameters:
data: Parameterquery: Parameterdoc_type: Parameterdetail_level: Parameter
Returns: See docstring for return details
_fetch_neo4j_nodes(self, base_query, keywords, node_type, content_field, limit)
Purpose: Helper method to fetch nodes from Neo4j with keyword filtering Args: base_query: Base cypher query for filtering keywords: Keywords for filtering results node_type: Type of node to fetch content_field: Field containing the content limit: Maximum number of nodes to retrieve Returns: List of node dictionaries
Parameters:
base_query: Parameterkeywords: Parameternode_type: Parametercontent_field: Parameterlimit: Parameter
Returns: See docstring for return details
_process_node_type(self, base_query, query_embedding, query_text, node_type, content_field, k, embedding_function, doc_type)
Purpose: Helper method to process a specific node type with FAISS vector search Args: base_query: Base cypher query for pre-filtering query_embedding: Embedding vector for the query query_text: Text of the query for cross-encoder ranking node_type: Type of node to process (Text_chunk or Table_chunk) content_field: Field containing the node content k: Number of results to retrieve embedding_function: Function to generate embeddings doc_type: Type of document max_results: Maximum number of results to return Returns: Dictionary with blocks added and other metadata
Parameters:
base_query: Parameterquery_embedding: Parameterquery_text: Parameternode_type: Parametercontent_field: Parameterk: Parameterembedding_function: Parameterdoc_type: Parameter
Returns: See docstring for return details
collect_text_blocks(self, data, query)
Purpose: Performs collect text blocks
Parameters:
data: Parameterquery: Parameter
Returns: None
generate_prompt(self, template, data_sections, query)
Purpose: Performs generate prompt
Parameters:
template: Parameterdata_sections: Parameterquery: Parameter
Returns: None
extend_query(self, query)
Purpose: Performs extend query
Parameters:
query: Parameter
Returns: None
normalize(self, vector)
Purpose: Performs normalize
Parameters:
vector: Parameter
Returns: None
mmr(self, query_embedding, candidate_embeddings, candidate_docs, lambda_param, top_k)
Purpose: Performs mmr
Parameters:
query_embedding: Parametercandidate_embeddings: Parametercandidate_docs: Parameterlambda_param: Parametertop_k: Parameter
Returns: None
similarity_threshold_filter(self, query_embedding, candidate_embeddings, candidate_docs, similarity_threshold, top_k)
Purpose: Performs similarity threshold filter
Parameters:
query_embedding: Parametercandidate_embeddings: Parametercandidate_docs: Parametersimilarity_threshold: Parametertop_k: Parameter
Returns: None
consolidate_with_large_llm_and_citations(self)
Purpose: Wrapper method that delegates to the ExtensiveSearchManager instance. This provides backward compatibility and easy access to the dual approach functionality.
Returns: None
Required Imports
from typing import List
from typing import Any
from typing import Dict
import os
import panel as pn
Usage Example
# Example usage:
# result = OneCo_hybrid_RAG(bases)
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class OneCo_hybrid_RAG_v1 99.4% similar
-
class OneCo_hybrid_RAG 98.5% similar
-
class ODataType 47.5% similar
-
class DocumentProcessor_v1 47.1% similar
-
class MyEmbeddingFunction_v2 46.2% similar