class OneCo_hybrid_RAG
A class named OneCo_hybrid_RAG
/tf/active/vicechatdev/OneCo_hybrid_RAG copy.py
790 - 1760
moderate
Purpose
No detailed description available
Source Code
class OneCo_hybrid_RAG ():
def __init__(self):
## Set API keys
self.set_api_keys()
## Define the flow control variables to be exposed and set default values
self.flow_control = {
"pre_model" : ["OpenAi","gpt-4o-mini",0],
"model" : ["OpenAi","gpt-4o",0],
"search_engine" : ["Serper","google"],
"enable_search" : False,
"enable_memory" : False,
"memory_max_size" : 3,
"enable_referencing" : True,
}
## Different type of data can be provided here and will be included in the flow
self.data_handles = SimpleDataHandle()
## Define the UI elements to be exposed
self.chat_interface=pn.chat.ChatInterface(callback=self.response_callback,width=1200,callback_exception='verbose')
## Plan for chat memory
self.chat_memory = SimpleChatMemory(max_history=self.flow_control["memory_max_size"])
self.extended_query=None
# Set up the blocks_dict for references
self.blocks_dict = {}
self.block_counter = 1
# Explicitly set OpenAI API type for this class
os.environ["OPENAI_API_TYPE"] = "openai"
self.init_connections()
return
def init_connections(self):
uri = config.DB_ADDR
user, password = config.DB_AUTH
self.driver = GraphDatabase.driver(uri, auth=(user, password))
self.session = self.driver.session(database=config.DB_NAME)
api_key = "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" # Replace with your actual API key
self.chroma_embedder=MyEmbeddingFunction("gpt-4o-mini","text-embedding-3-small",api_key)
self.chroma_client=chromadb.HttpClient(host='vice_chroma', port=8000)
self.available_collections = self.chroma_client.list_collections()
return
def run_query(self, query, params=None):
"""
Execute a Cypher query and return the result
Parameters
----------
query : str
The Cypher query to execute
params : dict, optional
Parameters for the query
Returns
-------
result
The query result
"""
if params is None:
params = {}
return self.session.run(query, params)
def evaluate_query(self, query, params=None):
"""
Execute a Cypher query and return a single result
Parameters
----------
query : str
The Cypher query to execute
params : dict, optional
Parameters for the query
Returns
-------
object
The single result value
"""
if params is None:
params = {}
result = self.session.run(query, params)
record = result.single()
if record:
return record[0]
return None
def push_changes(self, node):
"""
Push changes to a node to the database
Parameters
----------
node : dict or node-like object
Node with properties to update
"""
# Extract node properties, handling both dict-like and node-like objects
if hasattr(node, 'items'):
# Dict-like object
properties = {k: v for k, v in node.items() if k != 'labels'}
labels = node.get('labels', [])
uid = node.get('UID')
else:
# Node-like object from previous driver
properties = {k: node[k] for k in node.keys() if k != 'UID'}
labels = list(node.labels)
uid = node['UID']
# Construct labels string for Cypher
if labels:
labels_str = ':'.join(labels)
match_clause = f"MATCH (n:{labels_str} {{UID: $uid}})"
else:
match_clause = "MATCH (n {UID: $uid})"
# Update node properties
if properties:
set_clauses = [f"n.`{key}` = ${key}" for key in properties]
query = f"{match_clause} SET {', '.join(set_clauses)}"
params = {"uid": uid, **properties}
self.run_query(query, params)
return
def count_tokens(self,text):
encoding = tiktoken.get_encoding("cl100k_base")
return len(encoding.encode(text))
def set_api_keys(self):
## Public openAI key
os.environ["OPENAI_API_KEY"]='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'
## Serper API key
os.environ["SERPER_API_KEY"] = "9a1f42c99feee69526e216af14e07b64fb4b3bfb"
## AzureOpenAI endpoint
os.environ["AZURE_OPENAI_ENDPOINT"] = "https://vice-llm-2.openai.azure.com/openai/deployments/OneCo-gpt/chat/completions?api-version=2024-08-01-preview"
## AzureOpenAI key
os.environ["AZURE_OPENAI_API_KEY"] = "8DaDtzYz3HePiypmFb6JQmJd3zUCtyCQkiYE8bePRnpyk2YNkJZRJQQJ99BAACfhMk5XJ3w3AAABACOGyJVB"
return
def extract_core_query(self,query_text):
"""
Extracts the core information-seeking question from a user query that may contain
both a question and processing instructions for the RAG system.
Args:
query_text: The original user query text
Returns:
dict: Contains the extracted information with keys:
- core_question: The actual information need/question
- instructions: Any processing instructions found
- is_complex: Boolean indicating if query contained instructions
"""
# Use the pre-model (smaller model) for this extraction task
llm = ChatOpenAI(
model=self.flow_control['pre_model'][1],
temperature=self.flow_control['pre_model'][2],
)
prompt = f"""
You are an AI query analyzer. Your task is to analyze the following user query and separate it into:
1. The core information-seeking question (what the user actually wants to know)
2. Any processing instructions (how the user wants information presented or processed)
User query: {query_text}
Output your analysis in strict JSON format:
```json
{{
"core_question": "The main question or information need",
"instructions": "Any processing instructions (or empty string if none)",
"is_complex": true/false (true if query contains instructions, false if it's just a question)
}}
```
Examples:
Input: "Tell me about mRNA vaccines and format the answer with bullet points"
Output:
```json
{{
"core_question": "Tell me about mRNA vaccines",
"instructions": "format the answer with bullet points",
"is_complex": true
}}
```
Input: "What are the main types of vaccine adjuvants?"
Output:
```json
{{
"core_question": "What are the main types of vaccine adjuvants?",
"instructions": "",
"is_complex": false
}}
```
Only respond with the JSON output, nothing else.
"""
response = llm.invoke(prompt)
try:
# Extract JSON from response if needed
content = response.content
if '```json' in content:
content = content.split('```json')[1].split('```')[0].strip()
elif '```' in content:
content = content.split('```')[1].split('```')[0].strip()
result = json.loads(content)
return result
except Exception as e:
# Fallback if parsing fails
print(f"Error parsing LLM response: {e}")
return {
"core_question": query_text,
"instructions": "",
"is_complex": False
}
def response_callback(self, query):
## We make a difference between the search enabled or disabled mode - the first will have 2 separate LLM calls.
## Common part - prepare the data
query_analysis = self.extract_core_query(query)
search_query = query_analysis["core_question"]
print("search query", search_query)
# Store the analysis for later use in processing
self.current_query_analysis = query_analysis
# Parse handler using the core question for retrieval
data_sections = self.parse_handler(search_query)
## prepare LLM following flow control
if self.flow_control['model'][0]=="OpenAi":
llm = ChatOpenAI(
model=self.flow_control['model'][1],
temperature=self.flow_control['model'][2],
timeout=None,
max_retries=2)
elif self.flow_control['model'][0]=="Azure":
llm = AzureChatOpenAI(
azure_deployment=self.flow_control['model'][1],
api_version=self.flow_control['model'][3],
temperature=self.flow_control['model'][2],
max_tokens=2500,
timeout=None,
max_retries=2)
else:
llm = ChatOpenAI(
model='gpt-4o',
temperature=0,
timeout=None,
max_retries=2)
## Search enabled mode
self.search_results = ""
if self.flow_control["enable_search"]:
## generate a first response to start the search
prompt=self.generate_prompt("Vaccine_google",data_sections,query)
answer = llm.invoke(prompt)
print("input for web search", answer.content)
dict=json.loads(answer.content[8:-4])
search_tool = GoogleSerperAPIWrapper()
for s in dict['search_queries']:
print("searching with ",s)
self.search_results = self.search_results+"\n"+ search_tool.run(s)
## This is the common part for both modes
prompt=self.generate_prompt("Vaccine_base",data_sections,query)
#print("prompt for final answer : ", prompt)
answer = llm.invoke(prompt)
# If reference formatting is enabled, apply it
if self.flow_control["enable_referencing"]:
# No need for conversion - use blocks_dict directly
ref_manager = ReferenceManager(default_style="apa")
processed_text, references_section = ref_manager.process_references(
answer.content,
self.blocks_dict,
style="apa"
)
#print("using refs", references_section)
formatted_answer = processed_text + "\n\n" + references_section
else:
formatted_answer = answer.content
self.chat_memory.save_context(
{"role": "user", "content": query},
{"role": "assistant", "content": answer.content},
)
## We generate a list of references in Markdown format
return pn.pane.Markdown(formatted_answer)
def get_embedding(self,text):
"""Generate an embedding for the given text using OpenAI's text-embedding-ada-002 model."""
response = openai.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
def parse_handler(self, query):
data_sections = {}
# Create blocks_dict directly in the format needed by ReferenceManager
self.blocks_dict = {} # Replace self.inline_refs with self.blocks_dict
self.block_counter = 1 # Start block numbering from 1 to match example
for key in self.data_handles.handlers.keys():
if self.data_handles.handlers[key]["type"] == "text":
data_sections[key] = f"[block {self.block_counter}] {self.data_handles.handlers[key]['data']}"
# Create block entry in proper format
self.blocks_dict[self.block_counter] = {
"type": "generic",
"id": f"text_{self.block_counter}",
"content": f"Text content: {self.data_handles.handlers[key]['data'][:100]}..."
}
elif self.data_handles.handlers[key]["type"] == "dataframe":
data_sections[key] = f"[block {self.block_counter}] {self.data_handles.handlers[key]['data'].to_markdown()}"
# Create block entry for dataframe
self.blocks_dict[self.block_counter] = {
"type": "generic",
"id": f"dataframe_{self.block_counter}",
"content": f"Dataframe content from {key}"
}
elif self.data_handles.handlers[key]["type"] == "vectorstore":
data_sections[key] = self.collect_text_blocks(self.data_handles.handlers[key], query)
elif self.data_handles.handlers[key]["type"] == "db_search":
data_sections[key] = self.collect_data_from_neo4j(self.data_handles.handlers[key], query)
elif self.data_handles.handlers[key]["type"] == "chromaDB":
data_sections[key] = self.collect_data_from_chroma(self.data_handles.handlers[key], query)
self.block_counter += 1
return data_sections
def reformat_data(self, data, min_document_length=30, similarity_threshold=0.95, use_crossencoder=False, inclusions=10):
"""
Reformat and filter data to be grouped by ID, excluding too-short documents
and documents that are too similar to each other. Optionally applies crossencoder ranking.
Args:
data: Original data structure
min_document_length: Minimum character length for documents to include (default: 30)
similarity_threshold: Threshold for document similarity (default: 0.95, higher means more similar)
use_crossencoder: Whether to apply crossencoder reranking (default: False)
inclusions: Number of documents to return after filtering (default: 10)
Returns:
List of selected documents (not dictionary)
"""
from sentence_transformers import CrossEncoder
import numpy as np
result = {}
selected_docs = []
selected_embeddings = []
# Unpack the nested lists for easier access
ids_list = data['ids'][0]
documents_list = data['documents'][0]
metadatas_list = data['metadatas'][0]
embeddings_array = data['embeddings'][0]
# First pass: filter by document length and organize data
candidates = []
for i, id_val in enumerate(ids_list):
# Check if document meets length requirement and does not exceed a max toaken lenght
if len(documents_list[i]) >= min_document_length and self.count_tokens(documents_list[i]) <= 10000:
candidates.append({
'id': id_val,
'document': documents_list[i],
'metadata': metadatas_list[i],
'embedding': embeddings_array[i].tolist() if embeddings_array is not None else None
})
# If we don't have enough candidates, return all we have
if len(candidates) <= inclusions:
return [(doc['metadata'],doc['document']) for doc in candidates]
# Second pass: filter by similarity
for candidate in candidates:
candidate_embedding = np.array(candidate['embedding'])
# Normalize embedding
norm = np.linalg.norm(candidate_embedding)
if norm > 0:
candidate_embedding = candidate_embedding / norm
# Check if candidate is too similar to any already selected document
is_redundant = False
for sel_emb in selected_embeddings:
similarity = np.dot(candidate_embedding, sel_emb)
if similarity >= similarity_threshold:
is_redundant = True
break
if not is_redundant:
# Add to result dictionary
result[candidate['id']] = {
'document': candidate['document'],
'metadata': candidate['metadata'],
'embedding': candidate['embedding']
}
# Add to selected lists for similarity checks
selected_docs.append(candidate)
selected_embeddings.append(candidate_embedding)
# If we've collected enough documents and don't need crossencoder, we can stop
if len(selected_docs) >= inclusions * 2 and not use_crossencoder:
break
# If using crossencoder for reranking
if use_crossencoder and len(selected_docs) > inclusions:
query = data.get('query_text', '')
if not query: # If no query provided, use a placeholder
query = "default query" # Ideally this should be passed in
cross_model = CrossEncoder('BAAI/bge-reranker-base')
query_doc_pairs = [(query, doc['document']) for doc in selected_docs]
scores = cross_model.predict(query_doc_pairs)
# Zip documents with their scores and sort by score (highest first)
doc_score_pairs = list(zip(selected_docs, scores))
ranked_docs = sorted(doc_score_pairs, key=lambda x: x[1], reverse=True)
# Take the top 'inclusions' documents after reranking
selected_docs = [doc for doc, _ in ranked_docs[:inclusions]]
elif len(selected_docs) > inclusions:
# If not using crossencoder but have too many docs, just take the first 'inclusions'
selected_docs = selected_docs[:inclusions]
# Return just the document text for further processing
#print("returning ",[(doc['metadata'],doc['document']) for doc in selected_docs])
return [(doc['metadata'],doc['document']) for doc in selected_docs]
def extract_filter_keywords(self, query, n_keywords=2):
"""
Extract distinguishing keywords from a query for filtering search results.
Args:
query: The user's query text
n_keywords: Maximum number of keywords to extract
Returns:
List of keywords for filtering
"""
llm = ChatOpenAI(
model=self.flow_control['pre_model'][1],
temperature=0
)
# Make the instruction much more explicit about the exact count
prompt = f"""
You are a search optimization expert. Extract EXACTLY {n_keywords} specific distinguishing keyword(s) from this query:
"{query}"
Guidelines:
- You MUST return EXACTLY {n_keywords} keyword(s) - no more, no less
- Focus on proper nouns, company names, technical terms, and specific concepts
- Select word(s) that would differentiate this topic from related but irrelevant topics
- Choose word(s) that could filter out incorrect contexts (wrong companies, unrelated domains)
- Exclude common words like "the", "and", "of"
- Return ONLY a JSON array of strings with EXACTLY {n_keywords} string(s)
Example:
For "Impact of Pfizer's mRNA vaccine development on COVID-19 transmission" and n_keywords=1
Output: ["Pfizer"]
For "Impact of Pfizer's mRNA vaccine development on COVID-19 transmission" and n_keywords=3
Output: ["Pfizer", "mRNA", "COVID-19"]
Output format:
```json
["keyword1"{", keyword2" if n_keywords > 1 else ""}{", ..." if n_keywords > 2 else ""}]
```
Remember: I need EXACTLY {n_keywords} keyword(s). Count carefully before submitting.
"""
response = llm.invoke(prompt)
try:
# Extract JSON from response
content = response.content.strip()
if '```json' in content:
content = content.split('```json')[1].split('```')[0].strip()
elif '```' in content:
content = content.split('```')[1].split('```')[0].strip()
keywords = json.loads(content)
# Force the correct number of keywords
if len(keywords) > n_keywords:
keywords = keywords[:n_keywords]
elif len(keywords) < n_keywords and len(keywords) > 0:
# If we got fewer keywords than requested but at least one, duplicate the first one
while len(keywords) < n_keywords:
keywords.append(keywords[0])
return [k.lower() for k in keywords] # Convert to lowercase for case-insensitive matching
except Exception as e:
print(f"Error extracting keywords: {e}")
# Fall back to simple keyword extraction if LLM fails
words = query.lower().split()
stopwords = ['the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'with', 'about']
return [w for w in words if w not in stopwords and len(w) > 3][:n_keywords]
def collect_data_from_chroma(self, data, query):
collected_blocks = []
# Extract filter keywords
filter_keywords = self.extract_filter_keywords(query)
print(f"Using filter keywords: {filter_keywords}")
# Configuration for retrieval
if "crossencoder" in data["processing_steps"]:
initial_k = data["inclusions"] * 10
else:
initial_k = data["inclusions"]
if "extend_query" in data["processing_steps"]:
self.extend_query(query)
self.extended_query.append(query)
else:
self.extended_query = [query]
client = self.chroma_client.get_collection(data["data"], embedding_function=self.chroma_embedder)
# Track how many documents we've added
docs_added = 0
target_docs = data["inclusions"]
for q in self.extended_query:
print("working on query ", q)
# Build where clauses for keyword filtering
# Start with hybrid search (both vector and keyword)
if filter_keywords and "keyword_filter" in data.get("processing_steps", []):
# First attempt: Try to get documents that contain ALL keywords
# We need to retrieve documents first, then filter them manually since ChromaDB
# doesn't support complex boolean operations in where clauses
print(f"Retrieving documents for ALL keywords filter: {filter_keywords}")
# Retrieve a larger batch of documents for post-filtering
retrieved_docs = client.query(
query_texts=[q],
n_results=initial_k*5, # Get more results since we'll filter many out
include=["documents", "metadatas", "embeddings"]
)
# Post-process to filter for documents containing ALL keywords
if retrieved_docs['documents'] and len(retrieved_docs['documents'][0]) > 0:
filtered_docs = {
'ids': [[]],
'documents': [[]],
'metadatas': [[]],
'embeddings': [[]]
}
for i, doc in enumerate(retrieved_docs['documents'][0]):
# Convert to lowercase for case-insensitive matching
doc_lower = doc.lower()
# Check if document contains ALL keywords
all_keywords_present = all(keyword.lower() in doc_lower for keyword in filter_keywords)
if all_keywords_present:
filtered_docs['ids'][0].append(retrieved_docs['ids'][0][i])
filtered_docs['documents'][0].append(doc)
filtered_docs['metadatas'][0].append(retrieved_docs['metadatas'][0][i])
if retrieved_docs['embeddings'] and len(retrieved_docs['embeddings'][0]) > i:
filtered_docs['embeddings'][0].append(retrieved_docs['embeddings'][0][i])
print(f"Found {len(filtered_docs['documents'][0])} documents containing ALL keywords")
# If we found documents with all keywords, process them
if filtered_docs['documents'][0]:
retrieved_texts = self.reformat_data(
filtered_docs,
min_document_length=30,
similarity_threshold=0.95,
use_crossencoder="crossencoder" in data["processing_steps"],
inclusions=(target_docs - docs_added) // 2 or 1
)
for metadata, document in retrieved_texts:
# Add reference in formatted text
collected_blocks.append(f"[block {self.block_counter}] {document}")
# Create proper blocks_dict entry
filepath = metadata.get('bibtex', '')
if filepath and filepath.lower().endswith(('.pptx', '.docx', '.xlsx', '.pdf', '.csv', '.txt')):
self.blocks_dict[self.block_counter] = {
"type": "document",
"id": f"doc_{self.block_counter}",
"path": filepath,
"description": f"Document from ChromaDB collection '{data['data']}'"
}
else:
self.blocks_dict[self.block_counter] = {
"type": "generic",
"id": f"ref_{self.block_counter}",
"content": f"ChromaDB: {data['data']} - {filepath}"
}
self.block_counter += 1
docs_added += 1
# Check if we have enough documents
if docs_added >= target_docs:
break
# If we didn't get enough results with all keywords, fallback to individual keyword filtering
if docs_added == 0 :
print(f"No results with ALL keywords, falling back to individual keywords")
# Try with individual keywords
for keyword in filter_keywords[:3]: # Limit to 3 keywords to avoid too many queries
where_filter = {"$contains": keyword}
print(f"Searching with filter: {where_filter}")
retrieved_docs = client.query(
query_texts=[q],
n_results=initial_k,
where_document=where_filter,
include=["documents", "metadatas", "embeddings"]
)
if retrieved_docs['documents'] and len(retrieved_docs['documents'][0]) > 0: # If we got results, process them
retrieved_texts = self.reformat_data(
retrieved_docs,
min_document_length=30,
similarity_threshold=0.95,
use_crossencoder="crossencoder" in data["processing_steps"],
inclusions=(target_docs - docs_added) // 2 or 1
)
for metadata, document in retrieved_texts:
# Add reference in formatted text
collected_blocks.append(f"[block {self.block_counter}] {document}")
# Create proper blocks_dict entry
filepath = metadata.get('bibtex', '')
if filepath and filepath.lower().endswith(('.pptx', '.docx', '.xlsx', '.pdf', '.csv', '.txt')):
self.blocks_dict[self.block_counter] = {
"type": "document",
"id": f"doc_{self.block_counter}",
"path": filepath,
"description": f"Document from ChromaDB collection '{data['data']}'"
}
else:
self.blocks_dict[self.block_counter] = {
"type": "generic",
"id": f"ref_{self.block_counter}",
"content": f"ChromaDB: {data['data']} - {filepath}"
}
self.block_counter += 1
docs_added += 1
# Check if we have enough documents
if docs_added >= target_docs:
break
# If we have enough documents, stop processing keywords
if docs_added >= target_docs:
break
else:
# Standard vector search without keyword filtering
where_filter = None
# If we still don't have enough results, fall back to pure vector search
if docs_added == 0 :
print(f"Falling back to pure vector search for additional documents")
retrieved_docs = client.query(
query_texts=[q],
n_results=initial_k*2,
where_document=None, # No filtering for fallback
include=["documents", "metadatas", "embeddings"]
)
retrieved_texts = self.reformat_data(
retrieved_docs,
min_document_length=30,
similarity_threshold=0.95,
use_crossencoder="crossencoder" in data["processing_steps"],
inclusions=(target_docs - docs_added) // 2 or 1
)
for metadata, document in retrieved_texts:
# Add reference in formatted text
collected_blocks.append(f"[block {self.block_counter}] {document}")
# Create proper blocks_dict entry
filepath = metadata.get('bibtex', '')
if filepath and filepath.lower().endswith(('.pptx', '.docx', '.xlsx', '.pdf', '.csv', '.txt')):
self.blocks_dict[self.block_counter] = {
"type": "document",
"id": f"doc_{self.block_counter}",
"path": filepath,
"description": f"Document from ChromaDB collection '{data['data']}'"
}
else:
self.blocks_dict[self.block_counter] = {
"type": "generic",
"id": f"ref_{self.block_counter}",
"content": f"ChromaDB: {data['data']} - {filepath}"
}
self.block_counter += 1
docs_added += 1
# Check if we have enough documents
if docs_added >= target_docs:
break
# If we have enough documents, stop processing queries
if docs_added >= target_docs:
break
print(f"Total docs added: {docs_added}")
print(f"Collected blocks: ","\n".join(collected_blocks))
return "\n".join(collected_blocks)
def get_embedding(self, text):
"""Generate an embedding for the given text using OpenAI's text-embedding model."""
# Explicitly set the API type before making the API call
import openai
original_api_type = os.environ.get("OPENAI_API_TYPE", None)
os.environ["OPENAI_API_TYPE"] = "openai" # Explicitly setting to standard OpenAI
try:
response = openai.embeddings.create(
model="text-embedding-3-small",
input=text
)
embedding = response.data[0].embedding
finally:
# Restore original API type setting
if original_api_type is not None:
os.environ["OPENAI_API_TYPE"] = original_api_type
elif "OPENAI_API_TYPE" in os.environ:
del os.environ["OPENAI_API_TYPE"]
return embedding
def collect_data_from_neo4j(self,data,query,doc_type):
collected_blocks = []
embedding_function = OpenAIEmbeddings(model="text-embedding-3-small")
if "crossencoder" in data["processing_steps"]:
initial_k=data["inclusions"]*10
else:
initial_k=data["inclusions"]
if "extend_query" in data["processing_steps"]:
self.extend_query(query)
self.extended_query.append(query)
else:
self.extended_query=[query]
if doc_type=="literature data":
cypher_query = data['data'] + """
WITH COLLECT(x) AS relevantDocs, $embedding AS query_embedding, $initial_k AS k
// Perform the vector search on the pre-filtered nodes.
$vector_call
YIELD node AS doc, score AS vectorScore
WHERE doc IN relevantDocs
// Retrieve additional fields from the parent Topic node.
MATCH (p:Publication)<--(doc)
RETURN collect([doc.id AS docId,
$target_field AS ChunkText,
p.name AS ParentName,
p.BibTex AS BibTex,
vectorScore])
"""
else:
cypher_query = data['data'] + """
WITH COLLECT(x) AS relevantDocs, $embedding AS query_embedding, $initial_k AS k
// Perform the vector search on the pre-filtered nodes.
$vector_call
YIELD node AS doc, score AS vectorScore
WHERE doc IN relevantDocs
// Retrieve additional fields from the parent Topic node.
MATCH (p:Publication)<--(doc)
RETURN collect([doc.id AS docId,
$target_field AS ChunkText,
p.name AS ParentName,
p.BibTex AS BibTex,
vectorScore])
"""
## First step is alway a similarity search
for q in self.extended_query:
print("working on query ",q)
embedding = self.get_embedding(q)
## We first collect Text_chunks
vector_call='CALL db.index.vector.queryNodes("text_embedding", k, query_embedding)'
target_field='doc.text'
query_run = self.session.run(
cypher_query,
embedding=embedding,
initial_k=initial_k,
vector_call=vector_call,
target_field=target_field
)
record = query_run.single()
if record:
result = record[0]
return "\n".join(collected_blocks)
def collect_text_blocks(self, data,query):
embedding_function = OpenAIEmbeddings()
if "crossencoder" in data["processing_steps"]:
initial_k=data["inclusions"]*10
else:
initial_k=data["inclusions"]
if "extend_query" in data["processing_steps"]:
self.extend_query(query)
self.extended_query.append(query)
else:
self.extended_query=[query]
## First step is alway a similarity search
collected_blocks = []
retriever = data['data'].as_retriever(
search_type="similarity",
search_kwargs={"k": initial_k*3}
)
for q in self.extended_query:
print("working on query ",q)
retrieved_docs = retriever.invoke(q)
retrieved_texts = [doc.page_content for doc in retrieved_docs if len(doc.page_content)>30]
# Here we recompute embeddings for each candidate document.
candidate_embeddings = np.array([self.normalize(embedding_function.embed_query(doc))
for doc in retrieved_texts])
# Compute and normalize the query embedding
query_embedding = self.normalize(np.array(embedding_function.embed_query(q)))
# 4. Run MMR to select a diverse subset of documents
print("running MMR")
#retrieved_texts = self.mmr(query_embedding, candidate_embeddings, retrieved_texts, lambda_param=0.5, top_k=initial_k)
retrieved_texts=self.similarity_threshold_filter(query_embedding, candidate_embeddings, retrieved_texts, similarity_threshold=0.95,top_k=initial_k)
## If crossencoder is used, we need to rerank the results
if "crossencoder" in data["processing_steps"]:
cross_model = CrossEncoder('BAAI/bge-reranker-base')
query_chunk_pairs = [(q, chunk) for chunk in retrieved_texts]
scores = cross_model.predict(query_chunk_pairs)
chunk_score_pairs = list(zip(retrieved_texts, scores))
ranked_chunks = sorted(chunk_score_pairs, key=lambda x: x[1], reverse=True)
retrieved_texts = [chunk for chunk, score in ranked_chunks[:data["inclusions"]//2]]
#print("blocks from ",q," \n","\n".join(retrieved_texts))
for block in retrieved_texts:
collected_blocks.append("[block "+str(self.block_counter)+"] "+block)
self.inline_refs['block '+str(self.block_counter)]='VStore Block '+str(self.block_counter)
self.block_counter+=1
return "\n".join(collected_blocks)
def generate_prompt(self,template,data_sections,query):
prompt_template=my_prompt_templates.get(template,'')
if prompt_template=="":
prompt_template=my_prompt_templates.get("Vaccine_base",'')
prompt=prompt_template["Instructions"]+"\n"
i=0
for i, key in enumerate(data_sections.keys()):
prompt=prompt+"Step "+str(i+1)+" on section labeled [" +key+"]: "+self.data_handles.handlers[key]['instructions']+ "\n"
if self.flow_control["enable_search"]:
prompt=prompt+"Step "+str(i+2)+" on section labeled [web search results] : Provide a summary of the given context data extracted from the web, using summary tables when possible.\n"
if self.flow_control["enable_memory"]:
prompt=prompt+"Step "+str(i+3)+" on section labeled [previous chats] : Also take into account your previous answers.\n"
prompt=prompt+prompt_template["Output Constraints"]+"\n\n"
i=0
for i, key in enumerate(data_sections.keys()):
prompt=prompt+"Data section "+str(i+1)+"- [" +key+"]\n"+data_sections[key]+ "\n"
if self.flow_control["enable_search"]:
prompt=prompt+"Data section "+str(i+2)+"- [web search results] \n"+self.search_results+ "\n"
if self.flow_control["enable_memory"]:
prompt=prompt+"Data section "+str(i+3)+"- [previous chats] \n"+self.chat_memory.get_formatted_history()+ "\n"
prompt=prompt+"User query: "+query
return prompt
def extend_query(self,query):
llm = ChatOpenAI(model_name="gpt-4o-mini", temperature=0)
prompt = f"""
You are an AI that enhances a given user search query to improve information retrieval.
### User Query:
{query}
### Instructions:
- Provide exactly 5 expanded queries.
- Each query should explore a different aspect or perspective of the original query.
- Use synonyms, related terms, or rephrased versions to cover various dimensions of the topic.
### Expanded Queries:
"""
answer = llm.invoke(prompt)
self.extended_query=[x for x in answer.content.strip().split("\n") if x != ""]
print("extended query",self.extended_query)
return
def normalize(self,vector):
return vector / np.linalg.norm(vector)
def mmr(self,query_embedding, candidate_embeddings, candidate_docs, lambda_param=0.7, top_k=5):
# Compute similarity between the query and each candidate (dot product assumes normalized vectors)
candidate_similarities = np.dot(candidate_embeddings, query_embedding)
# Initialize selected and remaining indices
selected_indices = []
candidate_indices = list(range(len(candidate_docs)))
# First selection: candidate with highest similarity
first_idx = int(np.argmax(candidate_similarities))
selected_indices.append(first_idx)
candidate_indices.remove(first_idx)
# Iteratively select documents that balance relevance and diversity
while len(selected_indices) < top_k and candidate_indices:
best_score = -np.inf
best_idx = None
for idx in candidate_indices:
# Relevance score for candidate idx
relevance = candidate_similarities[idx]
# Diversity score: maximum similarity with any already selected document
diversity = max(np.dot(candidate_embeddings[idx], candidate_embeddings[sel_idx])
for sel_idx in selected_indices)
# Combined MMR score
score = lambda_param * relevance - (1 - lambda_param) * diversity
if score > best_score:
best_score = score
best_idx = idx
selected_indices.append(best_idx)
candidate_indices.remove(best_idx)
return [candidate_docs[i] for i in selected_indices]
def similarity_threshold_filter(self, query_embedding, candidate_embeddings, candidate_docs, similarity_threshold=0.9,top_k=5):
selected_docs = []
selected_embeddings = []
# Compute query similarity scores for sorting candidates (highest first)
candidate_scores = np.dot(candidate_embeddings, query_embedding)
sorted_indices = np.argsort(candidate_scores)[::-1]
for idx in sorted_indices:
candidate_embedding = candidate_embeddings[idx]
# Check if candidate is too similar to any already selected document
is_redundant = any(np.dot(candidate_embedding, sel_emb) >= similarity_threshold
for sel_emb in selected_embeddings)
if not is_redundant and len(selected_docs) < top_k:
print("appending ",candidate_docs[idx])
selected_docs.append(candidate_docs[idx])
selected_embeddings.append(candidate_embedding)
return selected_docs
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self)
Purpose: Internal method: init
Returns: None
init_connections(self)
Purpose: Performs init connections
Returns: None
run_query(self, query, params)
Purpose: Execute a Cypher query and return the result Parameters ---------- query : str The Cypher query to execute params : dict, optional Parameters for the query Returns ------- result The query result
Parameters:
query: Parameterparams: Parameter
Returns: See docstring for return details
evaluate_query(self, query, params)
Purpose: Execute a Cypher query and return a single result Parameters ---------- query : str The Cypher query to execute params : dict, optional Parameters for the query Returns ------- object The single result value
Parameters:
query: Parameterparams: Parameter
Returns: See docstring for return details
push_changes(self, node)
Purpose: Push changes to a node to the database Parameters ---------- node : dict or node-like object Node with properties to update
Parameters:
node: Parameter
Returns: None
count_tokens(self, text)
Purpose: Performs count tokens
Parameters:
text: Parameter
Returns: None
set_api_keys(self)
Purpose: Sets api keys
Returns: None
extract_core_query(self, query_text)
Purpose: Extracts the core information-seeking question from a user query that may contain both a question and processing instructions for the RAG system. Args: query_text: The original user query text Returns: dict: Contains the extracted information with keys: - core_question: The actual information need/question - instructions: Any processing instructions found - is_complex: Boolean indicating if query contained instructions
Parameters:
query_text: Parameter
Returns: See docstring for return details
response_callback(self, query)
Purpose: Performs response callback
Parameters:
query: Parameter
Returns: None
get_embedding(self, text)
Purpose: Generate an embedding for the given text using OpenAI's text-embedding-ada-002 model.
Parameters:
text: Parameter
Returns: None
parse_handler(self, query)
Purpose: Performs parse handler
Parameters:
query: Parameter
Returns: None
reformat_data(self, data, min_document_length, similarity_threshold, use_crossencoder, inclusions)
Purpose: Reformat and filter data to be grouped by ID, excluding too-short documents and documents that are too similar to each other. Optionally applies crossencoder ranking. Args: data: Original data structure min_document_length: Minimum character length for documents to include (default: 30) similarity_threshold: Threshold for document similarity (default: 0.95, higher means more similar) use_crossencoder: Whether to apply crossencoder reranking (default: False) inclusions: Number of documents to return after filtering (default: 10) Returns: List of selected documents (not dictionary)
Parameters:
data: Parametermin_document_length: Parametersimilarity_threshold: Parameteruse_crossencoder: Parameterinclusions: Parameter
Returns: See docstring for return details
extract_filter_keywords(self, query, n_keywords)
Purpose: Extract distinguishing keywords from a query for filtering search results. Args: query: The user's query text n_keywords: Maximum number of keywords to extract Returns: List of keywords for filtering
Parameters:
query: Parametern_keywords: Parameter
Returns: See docstring for return details
collect_data_from_chroma(self, data, query)
Purpose: Performs collect data from chroma
Parameters:
data: Parameterquery: Parameter
Returns: None
get_embedding(self, text)
Purpose: Generate an embedding for the given text using OpenAI's text-embedding model.
Parameters:
text: Parameter
Returns: None
collect_data_from_neo4j(self, data, query, doc_type)
Purpose: Performs collect data from neo4j
Parameters:
data: Parameterquery: Parameterdoc_type: Parameter
Returns: None
collect_text_blocks(self, data, query)
Purpose: Performs collect text blocks
Parameters:
data: Parameterquery: Parameter
Returns: None
generate_prompt(self, template, data_sections, query)
Purpose: Performs generate prompt
Parameters:
template: Parameterdata_sections: Parameterquery: Parameter
Returns: None
extend_query(self, query)
Purpose: Performs extend query
Parameters:
query: Parameter
Returns: None
normalize(self, vector)
Purpose: Performs normalize
Parameters:
vector: Parameter
Returns: None
mmr(self, query_embedding, candidate_embeddings, candidate_docs, lambda_param, top_k)
Purpose: Performs mmr
Parameters:
query_embedding: Parametercandidate_embeddings: Parametercandidate_docs: Parameterlambda_param: Parametertop_k: Parameter
Returns: None
similarity_threshold_filter(self, query_embedding, candidate_embeddings, candidate_docs, similarity_threshold, top_k)
Purpose: Performs similarity threshold filter
Parameters:
query_embedding: Parametercandidate_embeddings: Parametercandidate_docs: Parametersimilarity_threshold: Parametertop_k: Parameter
Returns: None
Required Imports
from typing import List
from typing import Any
from typing import Dict
import os
import panel as pn
Usage Example
# Example usage:
# result = OneCo_hybrid_RAG(bases)
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class OneCo_hybrid_RAG_v1 99.0% similar
-
class OneCo_hybrid_RAG_v2 98.5% similar
-
class ODataType 49.2% similar
-
class DocumentProcessor_v1 46.1% similar
-
class MyEmbeddingFunction_v2 46.0% similar