class pathobrowser_base_v1
Base class that contains all static elements of the app Parameters ---------- image : str An Image UID which may be passed on app startup. Immediately redirects to said image Attributes ---------- current_user : Userclass A class containing various information on the user workspace : panel.layout.Column The main container of the app sidebar : panel.layout.Column Container showing items on the side of the app head : panel.layout.Row The header of the app modal : panel.layout.Column The container for the modal window of the app
/tf/active/vicechatdev/datacapture.py
124 - 4351
moderate
Purpose
Base class that contains all static elements of the app Parameters ---------- image : str An Image UID which may be passed on app startup. Immediately redirects to said image Attributes ---------- current_user : Userclass A class containing various information on the user workspace : panel.layout.Column The main container of the app sidebar : panel.layout.Column Container showing items on the side of the app head : panel.layout.Row The header of the app modal : panel.layout.Column The container for the modal window of the app
Source Code
class pathobrowser_base(param.Parameterized):
"""
Base class that contains all static elements of the app
Parameters
----------
image : str
An Image UID which may be passed on app startup. Immediately redirects to said image
Attributes
----------
current_user : Userclass
A class containing various information on the user
workspace : panel.layout.Column
The main container of the app
sidebar : panel.layout.Column
Container showing items on the side of the app
head : panel.layout.Row
The header of the app
modal : panel.layout.Column
The container for the modal window of the app
"""
#image = param.String()
page_watchers = param.Dict({}, doc='A dictionary of watchers formatted as {obj:watcher}')
def __init__(self):
super().__init__()
self.init_connections()
self.load_template_tree()
self.log = None
self.syslog = None
self.current_user=User()
#self.image_uid = image
#self.workspace_out=pn.Column(sizing_mode="stretch_both",align="center",styles={'overflow': 'auto'})
self.workspace=pn.Column(sizing_mode="stretch_both",align="start",styles={'overflow': 'auto'})
#self.workspace_out.append(self.workspace)
self.upnav=pn.Column(sizing_mode="stretch_both",styles={'background':'lightgrey','overflow': 'auto'},align="start")
self.downnav=pn.Column(sizing_mode="stretch_both",styles={'background':'lightgrey','overflow': 'auto'},align="start")
self.sidebar=pn.Column(styles={'overflow': 'auto'})
self.codebox = pn.Column(visible=False)
self.head=pn.Row(align='end',styles={'overflow': 'auto'})
pn.state.on_session_destroyed(file_cleanup)
self.token_carriers=['Configuration','Projects','Project','People','Compounds','Customers','Customer','Library']
self.selected_data=None
self.llm_query=""
self.keywords_selected=""
self.selected_categories={}
self.selected_filter={}
self.shopping_cart=pd.DataFrame(columns=['UID','Name','Type'])
self.loading_indicator=pn.indicators.LoadingSpinner(value=False, name='Idle...',height=50,color='info', bgcolor='dark')
self.report_button = report_items = [('Generator','f'),('ExpSpace','g')]
self.report_button = pn.widgets.MenuButton(name='Report', items=report_items, stylesheets=['./assets/stylesheets/header_button.css'], min_width=50, max_width=120, width_policy='fit')
#bt = pn.widgets.Button(name='violin report', button_type='primary',styles={'background':'lightgreen'}, width=300)
#bt.report_type='violin'
self.report_button.target=''
self.report_button.on_click(self.display_report)
self.searchlimitbt = pn.widgets.Button(name='Please select object', stylesheets=['./assets/stylesheets/header_button.css'], min_width=50, max_width=200, width_policy='fit')
self.searchlimitbt.filter=''
self.searchlimitbt.handback="self.create_viewbox(row['UID'])"
self.searchlimitbt.on_click(self.display_search)
# toggle button for inclusion of internal data, public data, web search or chat history for chat
self.chat_with_internal=pn.widgets.Toggle(name='Include Internal data',button_type='primary',width=200,value=False,button_style='outline')
self.chat_with_internal.origin="chat_with_internal"
self.chat_with_internal.param.watch(self.chat_toggle,'value')
self.chat_with_external=pn.widgets.Toggle(name='Include Literature data',button_type='primary',width=200,value=False,button_style='outline')
self.chat_with_external.origin="chat_with_external"
self.chat_with_external.param.watch(self.chat_toggle,'value')
self.chat_with_web=pn.widgets.Toggle(name='Perform Web search',button_type='primary',width=200,value=False,button_style='outline')
self.chat_with_web.origin="chat_with_web"
self.chat_with_web.param.watch(self.chat_toggle,'value')
self.chat_with_history=pn.widgets.Toggle(name='Include Chat history',button_type='primary',width=200,value=False,button_style='outline')
self.chat_with_history.origin="chat_with_history"
self.chat_with_history.param.watch(self.chat_toggle,'value')
self.chat_context_items=pn.widgets.IntInput(name='Number of Context items',value=30,width=200)
self.chat_int_stores=pn.widgets.MultiChoice(name='Internal data stores',value=[],options=['T001','SP_store'],width=200)
self.chat_int_stores.origin="chat_int_stores"
self.chat_int_stores.param.watch(self.chat_toggle,'value')
# Extensive search controls
self.chat_extensive_search=pn.widgets.Toggle(name='Enable Extensive Search',button_type='primary',width=200,value=False,button_style='outline')
self.chat_extensive_search.origin="chat_extensive_search"
self.chat_extensive_search.param.watch(self.chat_toggle,'value')
self.chat_extensive_chunks=pn.widgets.IntInput(name='Max chunks for extensive search',value=100,start=10,end=500,step=10,width=200)
self.chat_extensive_chunks.origin="chat_extensive_chunks"
self.chat_extensive_chunks.param.watch(self.chat_toggle,'value')
self.chat_summary_tokens=pn.widgets.IntInput(name='Target summary tokens',value=8000,start=4000,end=20000,step=1000,width=200)
self.chat_summary_tokens.origin="chat_summary_tokens"
self.chat_summary_tokens.param.watch(self.chat_toggle,'value')
# Detail level control
self.chat_detail_level=pn.widgets.Select(name='Detail Level',value='Balanced',options=['Summary','Balanced','Detailed','Comprehensive'],width=200)
self.chat_detail_level.origin="chat_detail_level"
self.chat_detail_level.param.watch(self.chat_toggle,'value')
# Keyword filtering controls
self.chat_keyword_filtering=pn.widgets.Toggle(name='Enable Keyword Filtering',button_type='primary',width=200,value=False,button_style='outline')
self.chat_keyword_filtering.origin="chat_keyword_filtering"
self.chat_keyword_filtering.param.watch(self.chat_toggle,'value')
# Manual keyword input for filtering
self.chat_manual_keywords=pn.widgets.TextInput(name='Manual filter keywords (comma separated)',value='',width=400,placeholder='e.g., vaccine, mRNA, stability')
self.chat_manual_keywords.origin="chat_manual_keywords"
self.chat_manual_keywords.param.watch(self.chat_toggle,'value')
# Reference relevance filtering controls
self.chat_reference_filtering=pn.widgets.Toggle(name='Enable Reference Relevance Filtering',button_type='primary',width=200,value=True,button_style='outline')
self.chat_reference_filtering.origin="chat_reference_filtering"
self.chat_reference_filtering.param.watch(self.chat_toggle,'value')
self.chat_relevance_threshold=pn.widgets.FloatSlider(name='Reference relevance threshold',value=0.3,start=0.1,end=0.9,step=0.1,width=200)
self.chat_relevance_threshold.origin="chat_relevance_threshold"
self.chat_relevance_threshold.param.watch(self.chat_toggle,'value')
# Instruction template controls
self.chat_instruction_template=pn.widgets.Select(name='Instruction Template',value='Default',options=['Default','Vaccine Development','Scientific Report','Technical Analysis','Legal Document Analysis','Custom'],width=200)
self.chat_instruction_template.origin="chat_instruction_template"
self.chat_instruction_template.param.watch(self.chat_toggle,'value')
# Detailed instruction input
self.chat_detailed_instructions=pn.widgets.TextAreaInput(name='Detailed Instructions',value='',height=150,width=600,placeholder='Enter specific formatting instructions, requirements, or guidelines for the LLM response...')
self.chat_detailed_instructions.origin="chat_detailed_instructions"
self.chat_detailed_instructions.param.watch(self.chat_toggle,'value')
# Instruction management buttons
self.chat_save_instructions=pn.widgets.Button(name='Save Instructions',button_type='primary',width=150)
self.chat_save_instructions.on_click(self.save_instruction_template)
self.chat_load_instructions=pn.widgets.Button(name='Load Instructions',button_type='success',width=150)
self.chat_load_instructions.on_click(self.load_instruction_template)
# Instruction name input for saving
self.chat_instruction_name=pn.widgets.TextInput(name='Template Name',value='',width=200,placeholder='Enter name for custom template...')
return
def __del__(self):
"""
Close database connections when the object is destroyed
"""
try:
if hasattr(self, 'session') and self.session:
self.session.close()
if hasattr(self, 'driver') and self.driver:
self.driver.close()
except Exception as e:
print(f"Error closing Neo4j connections: {e}")
def chat_toggle(self,event):
if event.obj.origin=="chat_with_web":
self.chatbox.flow_control['enable_search']=event.new
if event.obj.origin=="chat_with_history":
self.chatbox.flow_control['enable_memory']=event.new
if event.obj.origin=="chat_with_internal":
if event.new:
self.retrieve_internal_data()
if self.report_df.size>0:
self.chatbox.data_handles.add_data(name="internal data", type="dataframe", data=self.report_df, filters="", processing_steps=["markdown"], inclusions=10,instructions="")
else:
self.chatbox.data_handles.remove_data(name="internal data")
if event.obj.origin=="chat_with_external":
if event.new:
#blocks=self.retrieve_lit_chunks()
syns=self.return_selected_keywords()
db_searches=["match (x:Text_chunk)<-[:REFERS]-(k:Keyword) where k.Name in ['"+"','".join(syns)+"']"]
db_searches.append("match (x:Table_chunk)<-[:REFERS]-(k:Keyword) where k.Name in ['"+"','".join(syns)+"']")
self.chatbox.data_handles.add_data(name="literature data", type="db_search", data=db_searches, filters="", processing_steps=["similarity","extend_query"], inclusions=10, instructions="")
else:
self.chatbox.data_handles.remove_data(name="literature data")
if event.obj.origin=="chat_int_stores":
# Handle MultiChoice selection changes
# First remove any previously added vector stores
for collection_name in self.chatbox.available_collections:
self.chatbox.data_handles.remove_data(name=f"Internal data store: {collection_name}")
# Add selected vector stores
for collection_name in event.new:
if collection_name != 'None':
self.chatbox.data_handles.add_data(
name=f"Internal data store: {collection_name}",
type="chromaDB",
data=collection_name,
filters="",
processing_steps=["similarity","crossencoder","extend_query","keyword_filter"],
inclusions=10,
instructions="")
if event.obj.origin=="chat_extensive_search":
self.chatbox.flow_control['enable_extensive_search']=event.new
if event.obj.origin=="chat_extensive_chunks":
self.chatbox.flow_control['extensive_search_chunks']=event.new
if event.obj.origin=="chat_summary_tokens":
self.chatbox.flow_control['target_summary_tokens']=event.new
if event.obj.origin=="chat_detail_level":
self.chatbox.flow_control['detail_level']=event.new
if event.obj.origin=="chat_keyword_filtering":
self.chatbox.flow_control['enable_keyword_filtering']=event.new
if event.obj.origin=="chat_manual_keywords":
self.chatbox.flow_control['manual_keywords']=event.new
if event.obj.origin=="chat_reference_filtering":
self.chatbox.flow_control['enable_reference_filtering']=event.new
if event.obj.origin=="chat_relevance_threshold":
self.chatbox.flow_control['relevance_threshold']=event.new
if event.obj.origin=="chat_instruction_template":
self.update_instruction_template(event.new)
if event.obj.origin=="chat_detailed_instructions":
self.chatbox.flow_control['detailed_instructions']=event.new
def update_instruction_template(self, template_name):
"""Update the detailed instructions based on selected template"""
if hasattr(self, 'chatbox'):
# Get predefined instructions from the chatbox
predefined_instructions = self.chatbox.get_instruction_template(template_name)
if predefined_instructions:
self.chat_detailed_instructions.value = predefined_instructions
self.chatbox.flow_control['detailed_instructions'] = predefined_instructions
def save_instruction_template(self, event):
"""Save current instructions as a custom template"""
template_name = self.chat_instruction_name.value.strip()
if not template_name:
print("Please enter a template name")
return
instructions = self.chat_detailed_instructions.value
if not instructions.strip():
print("Please enter instructions to save")
return
if hasattr(self, 'chatbox'):
success = self.chatbox.save_instruction_template(template_name, instructions)
if success:
# Update the options in the dropdown
current_options = list(self.chat_instruction_template.options)
if template_name not in current_options:
current_options.append(template_name)
self.chat_instruction_template.options = current_options
print(f"Template '{template_name}' saved successfully")
self.chat_instruction_name.value = "" # Clear the name field
else:
print(f"Failed to save template '{template_name}'")
def load_instruction_template(self, event):
"""Load and apply selected instruction template"""
template_name = self.chat_instruction_template.value
if hasattr(self, 'chatbox'):
instructions = self.chatbox.get_instruction_template(template_name)
if instructions:
self.chat_detailed_instructions.value = instructions
self.chatbox.flow_control['detailed_instructions'] = instructions
print(f"Template '{template_name}' loaded successfully")
else:
print(f"Template '{template_name}' not found")
def update_access_tokens(self,UID=None):
if UID!=None:
all_nodes=[UID]
print(all_nodes)
else:
all_nodes=self.evaluate_query("match (x) where not ('Template' in labels(x)) return collect(x.UID)")
for n in all_nodes:
all_keys=[]
for k in self.token_carriers:
print(k)
keys=self.evaluate_query("match (x:"+k+")-[*..5]->(y {UID:'"+n+"'}) return collect(distinct x.UID)")
#print(keys)
all_keys.extend(keys)
if all_keys!=[]:
#print("match (y {UID:'"+n+"'}) set y.Keys='"+",".join(all_keys))
out=self.run_query("match (y {UID:'"+n+"'}) set y.Keys='"+",".join(all_keys)+"'")
return
def load_template_tree(self):
#result_tree=self.graph.run("match p=((n:Configuration)-[*]->(x)) return distinct p").to_subgraph()
template_nodes=self.evaluate_query("match (n:Configuration)-[*]->(x) return collect(distinct x)")
#print("template_nodes ",template_nodes)
template_relations=self.evaluate_query("match (x:Template)-[r]->(y) return collect(r)")
#print("template_relations ",template_relations)
#print("template tree loaded:", self.template_tree)
#print("nodes : ",result_tree.nodes)
#print("relationships : ",result_tree.relationships)
# for n in self.template_tree.nodes:
# print(n['Name'])
# print(self.graph.run("match (n {UID:'"+n['UID']+"'}) return labels(n)[0]") )
self.template_tree=PropertyDict()
all_template_nodes=template_nodes
labellist = { n.element_id : list(n.labels) for n in template_nodes}
all_template_relationships=template_relations
#print("all relas ",all_template_relationships)
# We construct a dictionary that summarizes the template : for each node type (node label) the minumum fields with type specs,
# and also all possible up and down links - to which nodes and with which relationship properties
#print("showing a test relationship : ", all_template_relationships[0].start_node,all_template_relationships[0].end_node,all_template_relationships[0].relationship)
for n in all_template_nodes:
mylist=list(n.labels)
#print("label : ",mylist)
if mylist[0]!='Configuration':
try:
mylist.remove('Template')
except:
pass
data_fields=PropertyDict()
for k in n.keys():
if not(k in ['UID','Template_note','RO','Keys']):
data_fields[k]=n[k]
links_up=[]
links_down=[]
for r in all_template_relationships:
endlist=labellist[r.end_node.element_id]
try:
endlist.remove('Template')
except:
pass
startlist=labellist[r.start_node.element_id]
try:
startlist.remove('Template')
except:
pass
if mylist[0]==endlist[0]:
link_properties=PropertyDict()
for k in r.keys():
link_properties[k]=r[k]
links_up.append([startlist[0],r.type,link_properties])
self.template_tree[r.type]={'data_fields':link_properties}
if mylist[0]==startlist[0]:
link_properties=PropertyDict()
for k in r.keys():
link_properties[k]=r[k]
links_down.append([endlist[0],r.type,link_properties])
self.template_tree[r.type]={'data_fields':link_properties}
self.template_tree[mylist[0]]={'data_fields':data_fields,'uplinks':links_up,'downlinks':links_down}
#print("template tree : ",self.template_tree)
return
def safe_float(self,x):
x=str(x).replace(',','.').replace(' ','').replace('\u200b','')
try:
out=float(x)
except:
out=0
return out
def safe_int(self,x):
x=str(x).replace(',','.').replace(' ','').replace('\u200b','')
try:
out=int(x)
except:
out=0
return out
def _create_header_items(self):
homebtn = pn.widgets.Button(name='Home', stylesheets=['./assets/stylesheets/header_button.css'], min_width=50, max_width=120, width_policy='fit')
def home_callback(e):
#self._reset_image_url()
self.direct_start_app()
homebtn.on_click(home_callback)
searchbtn = pn.widgets.Button(name='Search by name', stylesheets=['./assets/stylesheets/header_button.css'], min_width=50, max_width=120, width_policy='fit')
searchbtn.filter=""
searchbtn.handback="self.create_viewbox(row['UID'])"
searchbtn.on_click(self.display_search)
cartbt=pn.widgets.Button(name='My cart', stylesheets=['./assets/stylesheets/header_button.css'], min_width=50, max_width=120, width_policy='fit')
cartbt.on_click(self.report_from_cart)
userbtn=pn.widgets.Button(name=self.current_user.user, stylesheets=['./assets/stylesheets/header_button.css'], min_width=50, max_width=120, width_policy='fit')
userbtn.on_click(self.show_user_prefs)
# if not self.current_user.is_staff:
# buttons = pn.FlexBox(homebtn, userbtn, flex_direction='row',\
# align_content='center',justify_content='flex-end',align_items='center',max_height=50)
# self.head.append(buttons)
# return
#settings=pn.widgets.Button(icon='settings', icon_size = '1.5em', stylesheets=['./assets/stylesheets/button.css'], css_classes=['light'])
#settings.on_click(self._options_page)
#buttons = pn.FlexBox(homebtn, searchbtn, self.searchlimitbt,self.report_button, cartbt,userbtn, self.loading_indicator,pn.pane.PNG('./images/etherna_logo.png',height=75), flex_direction='row',\
# align_content='center',justify_content='flex-end',align_items='center',max_height=50)
buttons = pn.FlexBox(homebtn, searchbtn, self.searchlimitbt,self.report_button, cartbt,userbtn, self.loading_indicator, flex_direction='row',\
align_content='center',justify_content='flex-end',align_items='center',max_height=50)
self.head.append(buttons)
def generate_welcome(self):
"""
Generates the welcome screen if the user is logged in through O365 or pops up the login screen and waits for it
"""
#self.direct_start_app()
#return
if self.current_user.login_state=="active":
self._create_header_items()
# if not self.log:
# self.log = self.current_user.log
# if not self.syslog:
# self.syslog = self.current_user.syslog
# if not self._validate_image_uid(): #check if user may access passed image UID
# self._reset_image_url() #reset it if validation failed for any reason.
# Generate the user's access keys
all_write_keys=[]
all_read_keys=[]
n=self.current_user.UID
for k in self.token_carriers:
keys=self.evaluate_query("match (x:"+k+")<-[r*]-(y {UID:'"+n+"'}) return collect(distinct [x.UID,r])")
for x in keys:
write=False
for rel in x[1]:
if rel.get('Rights','')=='Write':
write=True
if write:
all_write_keys.append(x[0])
else:
all_read_keys.append(x[0])
self.current_user.all_write_keys=all_write_keys
self.current_user.all_read_keys=all_read_keys
session_manager = SessionManager()
session_manager.set_user_id(n)
self.direct_start_app()
elif 'token' in self.current_user.login_state: #if we succesfully logged in using a token, we generated a new login token
token = self.current_user.login_state.split(':')[-1]
cookie = pn.pane.HTML(f"""
<script>
const d = new Date();
d.setTime(d.getTime() + (7*24*60*60*1000));
let expires = "expires="+ d.toUTCString() +";";
document.cookie = "Token={token};" + expires + "domain={config.DOMAIN}";
</script>
""")
self.head.append(cookie) #workspace refreshes too fast for the code to run reliably, so we stick it in the header instead :)
self.current_user.login_state = "active" #set it to active
self.generate_welcome() #and rerun the same function to actually login
else:
wait_for_login=self.current_user.param.watch(self.deferred_login, ['login_state'], onlychanged=True)
self.current_user.login_screen()
self.workspace.append(self.current_user.modal_content)
return
def deferred_login(self,event):
"""
processes the login if a "go" comes from the login screen (from the user class)
Parameters
----------
event : object
Event object where the new property contains a str
"""
if event.new=="active":
if not self.log:
self.log = self.current_user.log
#if not self.syslog:
# self.syslog = self.current_user.syslog
# Generate the user's access keys
all_write_keys=[]
all_read_keys=[]
n=self.current_user.UID
for k in self.token_carriers:
keys=self.evaluate_query("match (x:"+k+")<-[r*]-(y {UID:'"+n+"'}) return collect(distinct [x.UID,r])")
for x in keys:
write=False
for rel in x[1]:
if rel.get('Rights','')=='Write':
write=True
if write:
all_write_keys.append(x[0])
else:
all_read_keys.append(x[0])
self.current_user.all_write_keys=all_write_keys
self.current_user.all_read_keys=all_read_keys
#print("user keys ",self.current_user.all_write_keys,self.current_user.all_read_keys)
self._create_header_items()
#if not self._validate_image_uid(): #check if user may access passed image UID
# self._reset_image_url() #reset it if validation failed for any reason.
self.direct_start_app()
return
def show_user_prefs(self,event):
"""
Shows modal with user preferences and files
"""
user_popup=pn.Column(self.current_user.user_stats, align = "center", sizing_mode = "scale_both")
logout = pn.widgets.Button(name="logout", stylesheets=['./assets/stylesheets/button.css'], css_classes=['green'], min_width=50, max_width=120, sizing_mode='scale_width',
width_policy='fit', align='end')
logout.on_click(self.user_logout)
user_popup.append(logout)
self.workspace.append(pn.layout.FloatPanel(user_popup, name="User", theme='#3DCAB1', contained=False, position='center', config={
"headerControls": {
"smallify": "remove",
"maximize":"remove",
"minimize":"remove",
"normalize":"remove",
}
}))
def user_logout(self, event):
self.log.info("User logged off")
#self.syslog.info(f"User {self.current_user.user} logged off")
self.log=None
self.syslog=None
self.head.clear()
#self._reset_image_url()
self.workspace.clear()
self.sidebar.clear()
cookie = pn.pane.HTML("""
<script>
document.cookie = "User=; expires=Thu, 01 Jan 1970 00:00:00 UTC;";
document.cookie = "Token='; expires=Thu, 01 Jan 1970 00:00:00 UTC;";
</script>
""")
self.head.append(cookie) #clear cookies
self.run_query(f"MATCH (u:User {{UID:'{self.current_user.UID}'}}) SET u.token = ''") #delete token in database
self.current_user=User() #overwrite the existing User class to clean up all data
self.generate_welcome()
def init_connections(self):
uri = config.DB_ADDR
user, password = config.DB_AUTH
self.driver = GraphDatabase.driver(uri, auth=(user, password))
self.session = self.driver.session(database=config.DB_NAME)
return
def run_query(self, query, params=None):
"""
Execute a Cypher query and return the result
Parameters
----------
query : str
The Cypher query to execute
params : dict, optional
Parameters for the query
Returns
-------
result
The query result
"""
if params is None:
params = {}
return self.session.run(query, params)
def evaluate_query(self, query, params=None):
"""
Execute a Cypher query and return a single result
Parameters
----------
query : str
The Cypher query to execute
params : dict, optional
Parameters for the query
Returns
-------
object
The single result value
"""
if params is None:
params = {}
result = self.session.run(query, params)
record = result.single()
if record:
return record[0]
return None
def push_changes(self, node):
"""
Push changes to a node to the database
Parameters
----------
node : dict or node-like object
Node with properties to update
"""
# Extract node properties, handling both dict-like and node-like objects
if hasattr(node, 'items'):
# Dict-like object
properties = {k: v for k, v in node.items() if k != 'labels'}
labels = node.get('labels', [])
uid = node.get('UID')
else:
# Node-like object from previous driver
properties = {k: node[k] for k in node.keys() if k != 'UID'}
labels = list(node.labels)
uid = node['UID']
# Construct labels string for Cypher
if labels:
labels_str = ':'.join(labels)
match_clause = f"MATCH (n:{labels_str} {{UID: $uid}})"
else:
match_clause = "MATCH (n {UID: $uid})"
# Update node properties
if properties:
set_clauses = [f"n.`{key}` = ${key}" for key in properties]
query = f"{match_clause} SET {', '.join(set_clauses)}"
params = {"uid": uid, **properties}
self.run_query(query, params)
return
def where_is_file(self,UID):
"""
Find file location
Parameters
----------
UID : str
The UID of the file to find
Returns
-------
store : str
The filepath to the image file
"""
g_text="match (p:Picture {UID:'"+str(UID)+"'}) return p.Storage"
store=self.evaluate_query(g_text)
if store==None or store=='primary':
store="AllFileStore/"
else:
store="AllFileStore_"+store+"/"
return store
def action_from_search(self,event):
"""
Event handler. Gets called when the output_uid parameter of the carrousal changes, and goes to the respective page
Parameters
----------
event : object
Event object containing a UID in the event.new property, either matching a study (QC) or parblock (view)
"""
if event.new!="":
if self.search_page.action_selected=="view":
#self.create_carrousel(event.new)
self.search_page.output_uid=""
if self.search_page.action_selected=="view_redirect":
study = self.evaluate_query(f"MATCH (s:Study)-[*]->(:Picture {{UID:'{event.new}'}}) RETURN s.UID")
#self.create_carrousel(study)
self.search_page.output_uid=""
self.search_page.action_selected=''
# block_UID = self.graph.run(f"MATCH (p:Parblock)-[*2]->(:Picture {{UID:'{event.new}'}}) RETURN p.UID")
self.create_viewbox(self.image_uid)
return
def choose_widget(self,target,f,access_state='R'):
# Does this field have a template?
try:
label=list(target.labels)
except:
label=list(target.type)
try:
label.remove('Template')
except:
pass
template=self.template_tree[label[0]]
#print("template ",template, "from ",label[0])
if access_state=='W':
widget_state=False
else:
widget_state=True
tab='General'
if template!=None:
# Do we have a value for this field?
#print("loading field ",f)
field_type=template['data_fields'][f]
#print('field type ',field_type)
if field_type==None or field_type=='':
field_type='{"type":"text"}'
free_field=True
tab='General'
else:
free_field=False
try:
field_dict=json.loads(field_type)
modus=field_dict['type']
tab=field_dict.get('tab','General')
except:
field_dict={}
modus='text'
tab='General'
for case in switch(modus):
if case('text'):
if field_dict.get('options','')!='':
options=field_dict['options'].split(',')
w=pn.widgets.Select(name=f,options=options,value=target[f],disabled=widget_state)
else:
if f in ['UID','RO']:
w=pn.widgets.TextInput(name=f,value=target[f],disabled=True)
free_field=False
else:
w=pn.widgets.TextInput(name=f,value=target[f],disabled=widget_state)
return w,modus,free_field,tab
if case('int'):
if field_dict.get('options','')!='':
options=field_dict['options'].split(',')
w=pn.widgets.Select(name=f,options=options,value=target[f],disabled=widget_state)
else:
if field_dict.get('lower','')!='' and field_dict.get('upper','')!='':
try:
w=pn.widgets.IntSlider(name=f,value=self.safe_int(target[f]),start=self.safe_int(field_dict['lower']),end=self.safe_int(field_dict['upper']),step=1,disabled=widget_state)
except:
w=pn.widgets.TextInput(name=f,value=target[f],disabled=widget_state)
else:
try:
w=pn.widgets.IntInput(name=f,value=self.safe_int(target[f]),disabled=widget_state)
except:
w=pn.widgets.TextInput(name=f,value=target[f],disabled=widget_state)
return w,modus,free_field,tab
if case('float'):
if field_dict.get('options','')!='':
options=field_dict['options'].split(',')
w=pn.widgets.Select(name=f,options=options,value=target[f],disabled=widget_state)
else:
if field_dict.get('lower','')!='' and field_dict.get('upper','')!='':
try:
w=pn.widgets.FloatSlider(name=f,value=self.safe_float(target[f].replace(',','.')),start=self.safe_float(field_dict['lower']),end=self.safe_float(field_dict['upper']),disabled=widget_state)
except:
w=pn.widgets.TextInput(name=f,value=target[f],disabled=widget_state)
else:
try:
w=pn.widgets.FloatInput(name=f,value=self.safe_float(target[f].replace(',','.')),disabled=widget_state)
except:
w=pn.widgets.TextInput(name=f,value=target[f],disabled=widget_state)
return w,modus,free_field,tab
if case('date'):
if field_dict.get('options','')!='':
options=field_dict['options'].split(',')
w=pn.widgets.Select(name=f,options=options,value=target[f],disabled=widget_state)
else:
value=target[f]
try:
datevalue=dt.date.fromisoformat(value.replace('_','-'))
w=pn.widgets.DatePicker(name=f,value=datevalue,disabled=widget_state)
except:
w=pn.widgets.TextInput(name=f,value=target[f],disabled=widget_state)
return w,modus,free_field,tab
if case('datetime'):
if field_dict.get('options','')!='':
options=field_dict['options'].split(',')
w=pn.widgets.Select(name=f,options=options,value=target[f],disabled=widget_state)
else:
value=target[f]
try:
datevalue=dt.datetime.fromisoformat(value.replace('_','-'))
w=pn.widgets.DatetimePicker(name=f,value=datevalue,disabled=widget_state)
except:
w=pn.widgets.TextInput(name=f,value=target[f],disabled=widget_state)
return w,modus,free_field,tab
if case('check'):
value=target[f]
checkvalue=value.lower()=='yes'
w=pn.widgets.Checkbox(name=f,value=checkvalue,disabled=widget_state)
return w,modus,free_field,tab
if case('multichoice'):
value=target[f]
selectvalue=value.split(',')
options=field_dict['options'].split(',')
w=pn.widgets.MultiChoice(name=f,value=selectvalue,options=options,disabled=widget_state)
return w,modus,free_field,tab
else:
if field_dict.get('options','')!='':
options=field_dict['options'].split(',')
w=pn.widgets.Select(name=f,options=options,value=target[f],disabled=widget_state)
else:
w=pn.widgets.TextInput(name=f,value=target[f],disabled=widget_state)
return w,modus,free_field,tab
else:
modus='text'
if f in ['UID','RO']:
w=pn.widgets.TextInput(name=f,value=target[f],disabled=True)
free_field=False
else:
w=pn.widgets.TextInput(name=f,value=target[f],disabled=widget_state)
free_field=True
return w,modus,free_field,tab
def choose_filter_widget(self,label,f,value):
print("filter widget inputs ",label,f,value,type(value))
# Does this field have a template?
template=self.template_tree[label]
#print("template ",template, "from ",label[0])
widget_state=False
if template!=None:
# Do we have a value for this field?
#print("loading field ",f)
field_type=template['data_fields'][f]
#print('field type ',field_type)
if field_type==None or field_type=='':
field_type='{"type":"text"}'
free_field=True
else:
free_field=False
try:
field_dict=json.loads(field_type)
modus=field_dict['type']
except:
field_dict={}
modus='text'
for case in switch(modus):
if case('text'):
if field_dict.get('options','')!='':
options=field_dict['options'].split(',')
w=pn.widgets.Select(name=f,options=options,value=value,disabled=widget_state)
else:
if f in ['UID','RO']:
w=pn.widgets.TextInput(name=f,value=value,disabled=True)
free_field=False
else:
w=pn.widgets.TextInput(name=f,value=value,disabled=widget_state)
return w,modus,free_field
if case('int'):
if field_dict.get('options','')!='':
options=field_dict['options'].split(',')
w=pn.widgets.Select(name=f,options=options,value=value,disabled=widget_state)
else:
if field_dict.get('lower','')!='' and field_dict.get('upper','')!='':
try:
w=pn.widgets.IntSlider(name=f,value=self.safe_int(value),start=self.safe_int(field_dict['lower']),end=self.safe_int(field_dict['upper']),step=1,disabled=widget_state)
except:
w=pn.widgets.TextInput(name=f,value=value,disabled=widget_state)
else:
try:
w=pn.widgets.IntInput(name=f,value=self.safe_int(value),disabled=widget_state)
except:
w=pn.widgets.TextInput(name=f,value=value,disabled=widget_state)
return w,modus,free_field
if case('float'):
if field_dict.get('options','')!='':
options=field_dict['options'].split(',')
w=pn.widgets.Select(name=f,options=options,value=value,disabled=widget_state)
else:
if field_dict.get('lower','')!='' and field_dict.get('upper','')!='':
try:
w=pn.widgets.FloatSlider(name=f,value=self.safe_float(value.replace(',','.')),start=self.safe_float(field_dict['lower']),end=self.safe_float(field_dict['upper']),disabled=widget_state)
except:
w=pn.widgets.TextInput(name=f,value=value,disabled=widget_state)
else:
try:
w=pn.widgets.FloatInput(name=f,value=self.safe_float(value.replace(',','.')),disabled=widget_state)
except:
w=pn.widgets.TextInput(name=f,value=value,disabled=widget_state)
return w,modus,free_field
if case('date'):
if field_dict.get('options','')!='':
options=field_dict['options'].split(',')
w=pn.widgets.Select(name=f,options=options,value=value,disabled=widget_state)
else:
value=value
try:
datevalue=dt.date.fromisoformat(value.replace('_','-'))
w=pn.widgets.DatePicker(name=f,value=datevalue,disabled=widget_state)
except:
w=pn.widgets.TextInput(name=f,value=value,disabled=widget_state)
return w,modus,free_field
if case('datetime'):
if field_dict.get('options','')!='':
options=field_dict['options'].split(',')
w=pn.widgets.Select(name=f,options=options,value=value,disabled=widget_state)
else:
value=value
try:
datevalue=dt.datetime.fromisoformat(value.replace('_','-'))
w=pn.widgets.DatetimePicker(name=f,value=datevalue,disabled=widget_state)
except:
w=pn.widgets.TextInput(name=f,value=value,disabled=widget_state)
return w,modus,free_field
if case('check'):
value=value
checkvalue=value.lower()=='yes'
w=pn.widgets.Checkbox(name=f,value=checkvalue,disabled=widget_state)
return w,modus,free_field
if case('multichoice'):
value=value
selectvalue=value.split(',')
options=field_dict['options'].split(',')
w=pn.widgets.MultiChoice(name=f,value=selectvalue,options=options,disabled=widget_state)
return w,modus,free_field
else:
if field_dict.get('options','')!='':
options=field_dict['options'].split(',')
w=pn.widgets.Select(name=f,options=options,value=value,disabled=widget_state)
else:
w=pn.widgets.TextInput(name=f,value=value,disabled=widget_state)
return w,modus,free_field
else:
modus='text'
if f in ['UID','RO']:
w=pn.widgets.TextInput(name=f,value=value,disabled=True)
free_field=False
else:
w=pn.widgets.TextInput(name=f,value=value,disabled=widget_state)
free_field=True
return w,modus,free_field
def save_object(self,event):
#print("saving object")
target=dict(event.obj.target)
# Put in some logic for format conversion upon save - based on template
if "Template" in list(event.obj.target.labels):
target[event.obj.field]=str(event.new)
self.push_changes(target)
return
label=list(event.obj.target.labels)
template=self.template_tree[label[0]]
if template!=None:
# Do we have a value for this field?
#print("loading field ",event.obj.field)
field_type=template['data_fields'][event.obj.field]
if field_type==None:
field_type='{"type":"text"}'
field_dict=json.loads(field_type)
modus=field_dict['type']
for case in switch(modus):
if case('text','int','float'):
target[event.obj.field]=str(event.new)
self.push_changes(target)
return
if case('date'):
target[event.obj.field]=str(dt.date.isoformat(event.new))
self.push_changes(target)
return
if case('datetime'):
target[event.obj.field]=str(dt.datetime.isoformat(event.new))
self.push_changes(target)
return
if case('check'):
if event.new:
target[event.obj.field]='yes'
else:
target[event.obj.field]='no'
self.push_changes(target)
return
if case('multichoice'):
target[event.obj.field]=",".join(event.new)
self.push_changes(target)
return
if case('password'):
passkey = hashlib.sha256(event.new.encode('utf-8')).hexdigest()
target[event.obj.field]=passkey
self.push_changes(target)
return
else:
target[event.obj.field]=str(event.new)
self.push_changes(target)
return
def add_new_field(self,event):
#print("adding field")
target=dict(event.obj.target)
field=event.obj.New_label
if field!='':
value=event.obj.New_value
target[field]=value
self.push_changes(target)
self.create_viewbox(event.obj.UID)
return
def add_new_child(self,event):
#print("adding child")
target=event.obj.target
if event.obj.template:
node=event.obj.New_node
relation=event.obj.New_relation
else:
node=event.obj.New_node.split(':')[0]
relation=event.obj.New_node.split(':')[1]
if node!='':
if relation=='':
relation='CHILD'
new_uid=str(uuid4())
if 'Template' in target.labels:
out=self.run_query("match (x {UID:'"+event.obj.UID+"'}) merge (x)-[:"+str(relation).upper()+"]->(n:"+node+":Template {UID:'"+new_uid+"'})")
else:
# Adding standard fields from the template
label=list(target.labels)
try:
label.remove('Template')
except:
pass
template=self.template_tree[node]
#print("template" ,template)
field_defs=list(template['data_fields'].keys())
#print("field defs ",field_defs)
field_string="UID:'"+new_uid+"'"
for f in field_defs:
print("defining fields ",f)
field_dict=json.loads(template['data_fields'][f])
modus=field_dict['type']
if field_string!='':
field_string=field_string+","
if modus=='text':
field_string=field_string+"`"+str(f)+"`"+":''"
if modus=='int':
field_string=field_string+"`"+str(f)+"`"+":'0'"
if modus=='float':
field_string=field_string+"`"+str(f)+"`"+":'0'"
if modus=='date':
field_string=field_string+"`"+str(f)+"`"+":'"+dt.datetime.now().strftime("%Y-%m-%d")+"'"
if modus=='datetime':
field_string=field_string+"`"+str(f)+"`"+":'"+dt.datetime.now().strftime("%Y-%m-%dT%I:%M:%S")+"'"
if modus=='check':
field_string=field_string+"`"+str(f)+"`"+":'no'"
if modus=='multichoice':
field_string=field_string+"`"+str(f)+"`"+":''"
if modus=='password':
field_string=field_string+"`"+str(f)+"`"+":''"
template=self.template_tree[relation]
#print("template" ,template)
field_defs=list(template['data_fields'].keys())
#print("field defs ",field_defs)
relation_string=""
for f in field_defs:
field_dict=json.loads(template['data_fields'][f])
modus=field_dict['type']
if relation_string!='':
relation_string=relation_string+","
if modus=='text':
relation_string=relation_string+"`"+str(f)+"`"+":''"
if modus=='int':
relation_string=relation_string+"`"+str(f)+"`"+":'0'"
if modus=='float':
relation_string=relation_string+"`"+str(f)+"`"+":'0'"
if modus=='date':
relation_string=relation_string+"`"+str(f)+"`"+":'"+dt.datetime.now().strftime("%Y-%m-%d")+"'"
if modus=='datetime':
relation_string=relation_string+"`"+str(f)+"`"+":'"+dt.datetime.now().strftime("%Y-%m-%dT%I:%M:%S")+"'"
if modus=='check':
relation_string=relation_string+"`"+str(f)+"`"+":'no'"
if modus=='multichoice':
relation_string=relation_string+"`"+str(f)+"`"+":''"
out=self.run_query("match (x {UID:'"+event.obj.UID+"'}) merge (x)-[:"+str(relation).upper()+" {"+relation_string+"}]->(n:"+node+" {"+field_string+"})")
print("updating access tokens")
self.update_access_tokens(new_uid)
print("finished updating access tokens")
self.create_viewbox(new_uid)
#self.create_carrousel(self.top_UID)
return
def link_to_object(self,startnode,endnode,relation,direction,is_template=False):
template=self.template_tree[relation]
#print("template" ,template)
if not(is_template):
field_defs=list(template['data_fields'].keys())
relation_string=""
for f in field_defs:
field_dict=json.loads(template['data_fields'][f])
modus=field_dict['type']
if relation_string!='':
relation_string=relation_string+","
if modus=='text':
relation_string=relation_string+"`"+str(f)+"`"+":''"
if modus=='int':
relation_string=relation_string+"`"+str(f)+"`"+":'0'"
if modus=='float':
relation_string=relation_string+"`"+str(f)+"`"+":'0'"
if modus=='date':
relation_string=relation_string+"`"+str(f)+"`"+":'"+dt.datetime.now().strftime("%Y-%m-%d")+"'"
if modus=='datetime':
relation_string=relation_string+"`"+str(f)+"`"+":'"+dt.datetime.now().strftime("%Y-%m-%dT%I:%M:%S")+"'"
if modus=='check':
relation_string=relation_string+"`"+str(f)+"`"+":'no'"
if modus=='multichoice':
relation_string=relation_string+"`"+str(f)+"`"+":''"
if direction=='up':
out=self.evaluate_query("match (x {UID:'"+startnode+"'}),(y {UID:'"+endnode+"'}) merge (x)-[:"+str(relation).upper()+" {"+relation_string+"}]->(y) return y")
self.update_upnav(out)
else:
out=self.evaluate_query("match (x {UID:'"+startnode+"'}),(y {UID:'"+endnode+"'}) merge (x)-[:"+str(relation).upper()+" {"+relation_string+"}]->(y) return x")
self.update_downnav(out)
else:
if direction=='up':
out=self.evaluate_query("match (x {UID:'"+startnode+"'}),(y {UID:'"+endnode+"'}) merge (x)-[:"+str(relation).upper()+"]->(y) return y")
self.update_upnav(out)
else:
out=self.evaluate_query("match (x {UID:'"+startnode+"'}),(y {UID:'"+endnode+"'}) merge (x)-[:"+str(relation).upper()+"]->(y) return x")
self.update_downnav(out)
self.update_access_tokens(startnode)
self.update_access_tokens(endnode)
return
def remove_node(self,event):
uid=event.obj.UID
parent_node=self.evaluate_query("match (x)-->(n {UID:'"+uid+"'}) return x.UID limit 1")
out=self.run_query("match (n {UID:'"+uid+"'}) detach delete n")
self.create_viewbox(parent_node)
#self.create_carrousel(self.top_UID)
return
def delete_field(self,event):
self.run_query("match (x {UID:'"+event.obj.target['UID']+"'}) remove x.`"+event.obj.field+"`")
self.create_viewbox(event.obj.target['UID'])
return
def find_edges(self,UID):
print(type(UID))
if not isinstance(UID, list):
UID=[UID]
connectome=[]
for u in UID:
# to get better performance we introduce a procedure that only looks at Template objects
# Find the tree edges
## for performance adapted to only look for LNP edges
#edge=self.graph.run("match (t {UID:'"+u+"'})-[*]->(x) where not exists((x)-->()) return collect(distinct x.UID)")
#edge=self.graph.run("match (t {UID:'"+u+"'})-[*]->(x:LNP) return collect(distinct x.UID)")
u_label=self.evaluate_query("match (t {UID:'"+u+"'}) return labels(t)[0]")
edge=self.evaluate_query("match (t:Template:"+u_label+")-[*]->(x) where not exists((x)-->()) return collect(distinct x.UID)")
#print("tree edges found")
# Build a connectome in 3 steps : target up, target down and edges up
target_up=self.evaluate_query("match (t:Template:"+u_label+")<-[*]-(x) return collect(labels(x))")
#print("target up found", target_up)
target_up_flat=[]
for p in target_up:
target_up_flat.extend(p)
target_down=self.evaluate_query("match (t:Template:"+u_label+")-[*]->(x) return collect(labels(x))")
#print("target down found",target_down)
target_down_flat=[]
for p in target_down:
target_down_flat.extend(p)
for e in edge:
single_edge=self.evaluate_query("match (t {UID:'"+e+"'})<-[*]-(x) return collect(labels(x))")
single_edge_flat=[]
for p in single_edge:
single_edge_flat.extend(p)
connectome.extend(single_edge_flat)
#print("edge extend found")
connectome.extend(target_down_flat)
connectome.extend(target_up_flat)
connectome=list(set(connectome))
items_to_remove = ['Template', 'DBAdmin', 'People', 'Projects', 'Compounds']
for item in items_to_remove:
if item in connectome:
connectome.remove(item)
return connectome
def takesecondkey(self,elem):
return elem[1]
def make_expspace_links(self,event):
# Construct filter string
if event.obj.objtype=='mrnas':
filter_string="exists ((:mRNA {Name:'"+event.obj.name+"'})-->(x))"
if event.obj.objtype=='ils':
filter_string="exists ((:Compound {Name:'"+event.obj.name+"'})-->(x))"
if event.obj.objtype=='hls':
filter_string="exists ((:Compound {Name:'"+event.obj.name+"'})-->(x))"
if event.obj.objtype=='buffers':
filter_string="exists ((:Compound {Name:'"+event.obj.name+"'})-[:BUFFER]->(x))"
if event.obj.objtype=='mixers':
filter_string="exists ((:Compound {Name:'"+event.obj.name+"'})-[:MIXING]->(x))"
if event.obj.objtype=='ilratios':
filter_string="exists ((:Compound)-[:IL {Ratio:'"+event.obj.name+"'}]->(x))"
if event.obj.objtype=='payloads':
filter_string="exists ((:mRNA)-[:Payload {`Amount_[µg]`:'"+event.obj.name+"'}]->(x))"
if event.obj.objtype=='tfrs':
filter_string="exists ((:Experiment)-[:RUN {TFR:'"+event.obj.name+"'}]->(x))"
if event.obj.objtype=='frrs':
filter_string="exists ((:Experiment)-[:RUN {FRR:'"+event.obj.name+"'}]->(x))"
if event.obj.objtype=='pkas':
filter_string="x.pKa='"+event.obj.name+"'"
if event.obj.objtype=='sizes':
filter_string="x.`Malvern_size_(d.nm)`='"+event.obj.name+"'"
# Check which buttons need to light up
startpoint=event.obj.startpoint
edges=self.evaluate_query("match (t {UID:'"+startpoint+"'})-[*]->(x:LNP) return collect(distinct x.UID)")
# We will collect a number of key parameters that define the experimental space
mrnas=self.evaluate_query("match (:Class {Name:'mRNA'})-->(c:mRNA)-->(x) where x.UID in ['"+"','".join(edges)+"'] and "+filter_string+" return collect (distinct c.Name)")
ils=self.evaluate_query("match (:Class {Name:'IL'})-->(c:Compound)-->(x) where x.UID in ['"+"','".join(edges)+"'] and "+filter_string+" return collect (distinct c.Name)")
hls=self.evaluate_query("match (:Class {Name:'helper lipids'})-->(c:Compound)-->(x) where x.UID in ['"+"','".join(edges)+"'] and "+filter_string+" return collect (distinct c.Name)")
buffers=self.evaluate_query("match (:Class {Name:'buffers'})-->(c:Compound)-[:BUFFER]->(x) where x.UID in ['"+"','".join(edges)+"'] and "+filter_string+" return collect (distinct c.Name)")
mixers=self.evaluate_query("match (:Class {Name:'buffers'})-->(c:Compound)-[:MIXING]->(x) where x.UID in ['"+"','".join(edges)+"'] and "+filter_string+" return collect (distinct c.Name)")
ilratios=self.evaluate_query("match (:Class {Name:'IL'})-->(c:Compound)-[r]->(x) where x.UID in ['"+"','".join(edges)+"'] and "+filter_string+" return collect (distinct r.Ratio)")
payloads=self.evaluate_query("match (:Class {Name:'mRNA'})-->(c:mRNA)-[r]->(x) where x.UID in ['"+"','".join(edges)+"'] and "+filter_string+" return collect (distinct r.`Amount_[µg]`)")
tfrs=self.evaluate_query("match (e:Experiment)-[r]->(x) where x.UID in ['"+"','".join(edges)+"'] and "+filter_string+" return collect (distinct r.TFR)")
frrs=self.evaluate_query("match (e:Experiment)-[r]->(x) where x.UID in ['"+"','".join(edges)+"'] and "+filter_string+" return collect (distinct r.FRR)")
pkas=self.evaluate_query("match (x) where x.UID in ['"+"','".join(edges)+"'] and "+filter_string+" return collect (distinct x.pKa)")
sizes=self.evaluate_query("match (x) where x.UID in ['"+"','".join(edges)+"'] and "+filter_string+" return collect (distinct x.`Malvern_size_(d.nm)`)")
for t in ['mrnas','ils','hls','buffers','mixers','ilratios','payloads','tfrs','frrs','pkas','sizes']:
l=self.expspace[t]
for i,item in enumerate(l):
if item[0] in eval(t):
if item[0]==event.obj.name:
l[i]=(item[0],'sel')
else:
l[i]=(item[0],'yes')
else:
l[i]=(item[0],'no')
self.floatpanel_report.clear()
self.workspace.pop(-1)
close_bt=pn.widgets.Button(name="close panel",button_type="danger")
close_bt.window="self.workspace"
close_bt.floater="self.floatpanel_report"
close_bt.on_click(self.floater_close)
self.floatpanel_report.append(close_bt)
start_name=self.evaluate_query("match (x {UID:'"+startpoint+"'}) return x.Name")
html_pane = pn.pane.HTML("<h1>Experimental space from "+start_name+"</h1>")
self.floatpanel_report.append(html_pane)
self.floatpanel_report.append(self.button_row_generate("mRNA","mrnas",startpoint))
self.floatpanel_report.append(self.button_row_generate("IL","ils",startpoint))
self.floatpanel_report.append(self.button_row_generate("Helper Lipid","hls",startpoint))
self.floatpanel_report.append(self.button_row_generate("Buffer","buffers",startpoint))
self.floatpanel_report.append(self.button_row_generate("Mixing","mixers",startpoint))
self.floatpanel_report.append(self.button_row_generate("IL Ratios","ilratios",startpoint))
self.floatpanel_report.append(self.button_row_generate("Payload","payloads",startpoint))
self.floatpanel_report.append(self.button_row_generate("TFR","tfrs",startpoint))
self.floatpanel_report.append(self.button_row_generate("FRR","frrs",startpoint))
self.floatpanel_report.append(self.button_row_generate("pKa obtained","pkas",startpoint))
self.floatpanel_report.append(self.button_row_generate("Malvern Sizes obtained","sizes",startpoint))
self.workspace.append(self.floatpanel_report)
return
def button_row_generate(self,title,objtype,startpoint):
out=pn.Column()
out.append(title)
bts=self.expspace[objtype]
nr_bts=len(bts)
nr_rows=nr_bts//4+1
for r in range(nr_rows):
row=pn.Row()
for i in [0,1,2,3]:
try:
if bts[r*4+i][1]=='no':
bt_type='primary'
else:
if bts[r*4+i][1]=='sel':
bt_type='danger'
else:
bt_type='success'
bt=pn.widgets.Button(name=bts[r*4+i][0][:20],button_type=bt_type,width=200)
bt.objtype=objtype
bt.startpoint=startpoint
bt.on_click(self.make_expspace_links)
row.append(bt)
row.append(pn.Spacer(width=10))
except Exception as e:
print(e)
pass
out.append(row)
return out
def remove_from_cart(self,event):
print("coming in ",event.obj.selected)
self.shopping_cart.drop(event.obj.selected,inplace=True)
self.shopping_cart.index=range(len(self.shopping_cart))
g_text="match (u:User {UID:'"+self.current_user.UID+"'}) set u.cart='"+self.shopping_cart.to_json()+"'"
result=self.run_query(g_text)
self.report_from_cart()
return
def report_from_cart(self,event=None):
button_style = """
.CPathBtn3,
.CPathBtn2 .bk-btn-group .bk-btn,
.CPathBtn2 .bk-input-group .bk-btn-default {
background-color: #aa64cf;
border: none;
border-radius: 5px;
color: white;
transition-duration: 0.4s;
min-height: 30px;
min-width: 100px;
font-size: 16px;
}
.CPathBtn3,
.CPathBtn2 .bk-btn-group .bk-btn:hover,
.CPathBtn2 .bk-input-group .bk-btn-default:hover {
background-color: #3f0b5b;
padding: 5px 10px;
font-size: 18px;
height: 110%;
}
"""
try:
self.floatpanel_report.clear()
self.workspace.pop(-1)
#print("clearing the panel")
except:
config = {"headerControls": {"maximize" : "remove","close" : "remove"}}
self.floatpanel_report=pn.layout.FloatPanel(name='Report', contained=False, margin=20,height=900,position="center",config=config)
#tabulator_filters = {
# field:{'placeholder':'Enter '+field+' Name'}
# }
study_overview_wd = pn.widgets.Tabulator(self.shopping_cart, layout='fit_columns', theme='fast', align='center', sizing_mode='stretch_height',
show_index=False, disabled=True, selectable='checkbox-single', width=1000,
hidden_columns=['UID'],
pagination=None,sortable=False,stylesheets=['./assets/stylesheets/tabulator.css'])
del_bt = pn.widgets.Button(name=" << ", button_type = "primary",width=75, stylesheets=[button_style])
del_bt.on_click(self.remove_from_cart)
def callback(target, event):
#print("incoming event ",event.new)
target.selected = event.new[0]
study_overview_wd.link(del_bt, callbacks={'selection': callback})
close_bt=pn.widgets.Button(name="close panel",button_type="danger")
close_bt.window="self.workspace"
close_bt.floater="self.floatpanel_report"
close_bt.on_click(self.floater_close)
self.floatpanel_report.append(close_bt)
self.floatpanel_report.append(study_overview_wd)
def go_to_object(e):
print("current table ",study_overview_wd.value)
row = study_overview_wd.value.loc[e.obj.selected]
self.workspace.pop(-1)
handback_statement="self.create_viewbox(row['UID'])"
parameter=handback_statement
del self.floatpanel_report
eval(parameter,{"self":self,"row":row})
go_bt=pn.widgets.Button(name='GO',button_type='success')
study_overview_wd.link(go_bt, callbacks={'selection': callback})
go_bt.on_click(go_to_object)
self.floatpanel_report.append(pn.Row(del_bt,go_bt))
#self.workspace.append(self.floatpanel_report)
startpoint=self.shopping_cart.loc[:,"UID"].values.tolist()
print(startpoint)
#startpoint=event.obj.target
connectome=self.find_edges(startpoint)
self.node_report_fields=[]
relationships_in_scope=[]
for n in connectome:
nodedata=self.template_tree.get(n,'')
if nodedata!='':
fields=nodedata.get('data_fields',{})
if fields!={}:
for f in fields.keys():
if f!='Keys':
self.node_report_fields.append(['node',n+":"+f])
#Get relevant relationships
relationships_in_scope.extend([i[1] for i in nodedata.get('uplinks')])
relationships_in_scope.extend([i[1] for i in nodedata.get('downlinks')])
for r in set(relationships_in_scope):
nodedata=self.template_tree.get(r,'')
if nodedata!='':
fields=nodedata.get('data_fields',{})
if fields!={}:
for f in fields.keys():
if f!='Keys':
self.node_report_fields.append(['link',r+":"+f])
self.node_report_fields.sort(key=self.takesecondkey)
#print("prestored values ",self.selected_data,self.selected_categories,self.selected_filter)
options=[i[1] for i in self.node_report_fields]
self.generate_bt=pn.widgets.Button(name="Generate",button_type="primary")
key_options=sorted(self.evaluate_query("match (x:Keyword) where not 'Template' in labels(x) return collect (distinct x.Name)") )
self.keywords_box=pn.widgets.TextInput(name="Keywords",value=self.keywords_selected)
self.query_box=pn.widgets.TextAreaInput(name="Query",value=self.llm_query,auto_grow=True,resizable='both',max_rows=10, rows=6)
if self.selected_data!=None:
select_value=pn.widgets.MultiChoice(name="value",options=options,value=self.selected_data,search_option_limit=50,max_items=1)
self.generate_bt.datavalue=self.selected_data
else:
select_value=pn.widgets.MultiChoice(name="value",options=options,search_option_limit=50,max_items=1)
self.generate_bt.datavalue=[]
if self.selected_categories!={}:
values=[i for i in self.selected_categories.keys()]
select_categories=pn.widgets.MultiChoice(name="categories",options=options,value=values,search_option_limit=50)
select_categories.watch_type="field"
select_categories.param.watch(self.categories_update, ['value'], onlychanged=True)
#self.generate_bt.categories=self.selected_categories
else:
select_categories=pn.widgets.MultiChoice(name="categories",options=options,search_option_limit=50)
select_categories.watch_type="field"
select_categories.param.watch(self.categories_update, ['value'], onlychanged=True)
#self.generate_bt.categories=[]
categories=pn.Column()
for f in self.selected_categories.keys():
cat_options=["any"]
cat_options.extend([i[1] for i in self.template_tree.get(f.split(':')[0],{}).get('downlinks',[])])
filter_value=pn.widgets.Select(name=f.split(':')[0]+":"+f.split(':')[1],options=cat_options,value=self.selected_categories[f])
#filter_value,_,_=self.choose_filter_widget(f.split(':')[0],f.split(':')[1],self.selected_categories[f])
filter_value.watch_type="value"
filter_value.field=f
filter_value.param.watch(self.categories_update, ['value'], onlychanged=True)
categories.append(filter_value)
if self.selected_filter!={}:
values=[i for i in self.selected_filter.keys()]
select_filters=pn.widgets.MultiChoice(name="filters",options=options,value=values,search_option_limit=50)
select_filters.watch_type="field"
select_filters.param.watch(self.filter_update, ['value'], onlychanged=True)
#self.generate_bt.filters=self.selected_filter
else:
select_filters=pn.widgets.MultiChoice(name="filters",options=options,search_option_limit=50)
select_filters.watch_type="field"
select_filters.param.watch(self.filter_update, ['value'], onlychanged=True)
#self.generate_bt.filters=[]
filters=pn.Column()
for f in self.selected_filter.keys():
filter_value,_,_=self.choose_filter_widget(f.split(':')[0],f.split(':')[1],self.selected_filter[f])
filter_value.watch_type="value"
filter_value.field=f
filter_value.param.watch(self.filter_update, ['value'], onlychanged=True)
filters.append(filter_value)
self.generate_bt.target=startpoint
select_value.link(self.generate_bt, value='datavalue')
#select_categories.link(self.generate_bt,value='categories')
#select_filters.link(self.generate_bt,value='filters')
self.generate_bt.on_click(self.generate_report)
report_items = ['Boxplot', 'Violin','Dataframe','LLM']
report_type=pn.widgets.Select(name="Report type", options=report_items,value='Boxplot')
report_type.link(self.generate_bt, value='report_type')
self.generate_bt.report_type='Boxplot'
self.floatpanel_report.append(pn.Column(select_value,select_categories,categories,select_filters,filters,self.keywords_box,self.query_box,report_type,self.generate_bt))
self.workspace.append(self.floatpanel_report)
return
def push_keyword(self,event):
if self.keywords_box.value=='':
self.keywords_box.value=event.new
else:
self.keywords_box.value=self.keywords_box.value+','+event.new
return
def hint_keyword(self,event):
options=['']
kws=event.new.split(',')
for k in kws:
options.extend(self.evaluate_query("match (x:Keyword) where toUpper(x.Name) contains '"+k.upper()+"' return collect(x.Name)") )
options=sorted(list(set(options)))
self.keyword_option.options=options
if len(kws)>0:
self.chat_with_external.value=True
else:
self.chat_with_external.value=False
return
def display_report(self,event):
if event.new==None:
return
try:
self.floatpanel_report.clear()
#print("clearing the panel")
except:
config = {"headerControls": {"maximize" : "remove","close" : "remove"}}
self.floatpanel_report=pn.layout.FloatPanel(name='Report', contained=False, margin=20,height=900,position="center",config=config)
if event.new=='a':
# Collect all size data per ETG type
startpoint=event.obj.target
#Check if target is IL or IL compound
check=self.evaluate_query("match (t {UID:'"+startpoint+"'}) return t.Name")
if check=='IL':
result=self.evaluate_query("match (t {UID:'"+startpoint+"'})-->(c:Compound)-->(x:LNP) where x.`Malvern_size_(d.nm)`<>'0' return collect([c.Name,toFloat(x.`Malvern_size_(d.nm)`)])")
else:
check=self.evaluate_query("match (t {UID:'"+startpoint+"'})<--(p:Class {Name:'IL'}) return p.UID")
if check!=None:
result=self.evaluate_query("match (t {UID:'"+startpoint+"'})-->(x:LNP) where x.`Malvern_size_(d.nm)`<>'0' return collect([t.Name,toFloat(x.`Malvern_size_(d.nm)`)])")
else:
result=self.evaluate_query("match (t {UID:'"+startpoint+"'})-[*]->(x:LNP)<--(c:Compound)<--(p:Class {Name:'IL'}) where x.`Malvern_size_(d.nm)`<>'0' return collect([c.Name,toFloat(x.`Malvern_size_(d.nm)`)])")
#result=self.graph.run("match (p:Class {Name:'IL'})-->(c:Compound)-->(x) return collect([c.Name,toFloat(x.`Malvern_size_(d.nm)`)])")
df=pd.DataFrame(result,columns=['ETG','Size'])
plot=hv.Violin(data=df,kdims=['ETG'],vdims=['Size']).opts(height=750, width=1000)
if event.new=='b':
# Collect all size data per ETG type
startpoint=event.obj.target
check=self.evaluate_query("match (t {UID:'"+startpoint+"'}) return t.Name")
if check=='IL':
result=self.evaluate_query("match (t {UID:'"+startpoint+"'})-->(c:Compound)-->(x:LNP) where x.`Malvern_size_(d.nm)`<>'0' return collect([c.Name,toFloat(x.`Malvern_size_(d.nm)`)])")
else:
check=self.evaluate_query("match (t {UID:'"+startpoint+"'})<--(p:Class {Name:'IL'}) return p.UID")
if check!=None:
result=self.evaluate_query("match (t {UID:'"+startpoint+"'})-->(x:LNP) where x.`Malvern_size_(d.nm)`<>'0' return collect([t.Name,toFloat(x.`Malvern_size_(d.nm)`)])")
else:
result=self.evaluate_query("match (t {UID:'"+startpoint+"'})-[*]->(x:LNP)<--(c:Compound)<--(p:Class {Name:'IL'}) where x.`Malvern_size_(d.nm)`<>'0' return collect([c.Name,toFloat(x.`Malvern_size_(d.nm)`)])")
#result=self.graph.run("match (p:Class {Name:'IL'})-->(c:Compound)-->(x) return collect([c.Name,toFloat(x.`Malvern_size_(d.nm)`)])")
df=pd.DataFrame(result,columns=['ETG','Size'])
plot=hv.BoxWhisker(data=df,kdims=['ETG'],vdims=['Size']).opts(height=750, width=1000)
if event.new=='c':
# Collect all size data per ETG type
startpoint=event.obj.target
check=self.evaluate_query("match (t {UID:'"+startpoint+"'}) return t.Name")
if check=='IL':
result=self.evaluate_query("match (t {UID:'"+startpoint+"'})-->(c:Compound)-->(x:LNP) where x.`pKa`<>'0' return collect([c.Name,toFloat(x.`pKa`)])")
else:
check=self.evaluate_query("match (t {UID:'"+startpoint+"'})<--(p:Class {Name:'IL'}) return p.UID")
if check!=None:
result=self.evaluate_query("match (t {UID:'"+startpoint+"'})-->(x:LNP) where x.`pKa`<>'0' return collect([t.Name,toFloat(x.`pKa`)])")
else:
result=self.evaluate_query("match (t {UID:'"+startpoint+"'})-[*]->(x:LNP)<--(c:Compound)<--(p:Class {Name:'IL'}) where x.`pKa`<>'0' return collect([c.Name,toFloat(x.`pKa`)])")
#result=self.graph.run("match (p:Class {Name:'IL'})-->(c:Compound)-->(x) return collect([c.Name,toFloat(x.`Malvern_size_(d.nm)`)])")
df=pd.DataFrame(result,columns=['ETG','pKa'])
plot=hv.BoxWhisker(data=df,kdims=['ETG'],vdims=['pKa']).opts(height=750, width=1000)
if event.new=='d':
# Collect all size data per ETG type
startpoint=event.obj.target
check=self.evaluate_query("match (t {UID:'"+startpoint+"'}) return t.Name")
if check=='IL':
result=self.evaluate_query("match (t {UID:'"+startpoint+"'})-->(c:Compound)-->(x:LNP) where x.`Zetapotential_(mV)`<>'0' return collect([c.Name,toFloat(x.`Zetapotential_(mV)`)])")
else:
check=self.evaluate_query("match (t {UID:'"+startpoint+"'})<--(p:Class {Name:'IL'}) return p.UID")
if check!=None:
result=self.evaluate_query("match (t {UID:'"+startpoint+"'})-->(x:LNP) where x.`Zetapotential_(mV)`<>'0' return collect([t.Name,toFloat(x.`Zetapotential_(mV)`)])")
else:
result=self.evaluate_query("match (t {UID:'"+startpoint+"'})-[*]->(x:LNP)<--(c:Compound)<--(p:Class {Name:'IL'}) where x.`Zetapotential_(mV)`<>'0' return collect([c.Name,toFloat(x.`Zetapotential_(mV)`)])")
#result=self.graph.run("match (p:Class {Name:'IL'})-->(c:Compound)-->(x) return collect([c.Name,toFloat(x.`Malvern_size_(d.nm)`)])")
df=pd.DataFrame(result,columns=['ETG','Zeta'])
plot=hv.BoxWhisker(data=df,kdims=['ETG'],vdims=['Zeta']).opts(height=750, width=1000)
if event.new=='e':
# Collect all size data per ETG type
startpoint=event.obj.target
check=self.evaluate_query("match (t {UID:'"+startpoint+"'}) return t.Name")
if check=='IL':
result=self.evaluate_query("match (t {UID:'"+startpoint+"'})-->(c:Compound)-[r]->(x:LNP) where x.`Malvern_size_(d.nm)`<>'0' return collect([c.Name,toInteger(r.Ratio),toFloat(x.`Malvern_size_(d.nm)`)])")
else:
check=self.evaluate_query("match (t {UID:'"+startpoint+"'})<--(p:Class {Name:'IL'}) return p.UID")
if check!=None:
result=self.evaluate_query("match (t {UID:'"+startpoint+"'})-[r]->(x:LNP) where x.`Malvern_size_(d.nm)`<>'0' return collect([t.Name,toInteger(r.Ratio),toFloat(x.`Malvern_size_(d.nm)`)])")
else:
result=self.evaluate_query("match (t {UID:'"+startpoint+"'})-[*]->(x:LNP)<-[r]-(c:Compound)<--(p:Class {Name:'IL'}) where x.`Malvern_size_(d.nm)`<>'0' return collect([c.Name,toInteger(r.Ratio),toFloat(x.`Malvern_size_(d.nm)`)])")
#result=self.graph.run("match (p:Class {Name:'IL'})-->(c:Compound)-->(x) return collect([c.Name,toFloat(x.`Malvern_size_(d.nm)`)])")
df=pd.DataFrame(result,columns=['ETG','Ratio','Size'])
plot=hv.BoxWhisker(data=df,kdims=['ETG','Ratio'],vdims=['Size']).opts(height=750, width=1000)
if event.new=='f':
if event.obj.target=='':
return
startpoint=event.obj.target
connectome=self.find_edges(startpoint)
print("connectome build")
self.node_report_fields=[]
relationships_in_scope=[]
for n in connectome:
nodedata=self.template_tree.get(n,'')
if nodedata!='':
fields=nodedata.get('data_fields',{})
if fields!={}:
for f in fields.keys():
if f!='Keys':
self.node_report_fields.append(['node',n+":"+f])
#Get relevant relationships
relationships_in_scope.extend([i[1] for i in nodedata.get('uplinks')])
relationships_in_scope.extend([i[1] for i in nodedata.get('downlinks')])
for r in set(relationships_in_scope):
nodedata=self.template_tree.get(r,'')
if nodedata!='':
fields=nodedata.get('data_fields',{})
if fields!={}:
for f in fields.keys():
if f!='Keys':
self.node_report_fields.append(['link',r+":"+f])
print("scope build")
self.node_report_fields.sort(key=self.takesecondkey)
#print("prestored values ",self.selected_data,self.selected_categories,self.selected_filter)
options=[i[1] for i in self.node_report_fields]
self.generate_bt=pn.widgets.Button(name="Generate",button_type="primary")
#key_options=sorted(self.graph.run("match (x:Keyword) where not 'Template' in labels(x) return collect (distinct x.Name)") )
self.keywords_box=pn.widgets.TextInput(name="Keywords",value='')
self.keyword_option=pn.widgets.Select(name="keyword hints",value='',options=[''])
self.query_box=pn.widgets.TextAreaInput(name="Query",value=self.llm_query,auto_grow=True,resizable='both',max_rows=10, rows=6)
self.keyword_option.param.watch(self.push_keyword, ['value'], onlychanged=True)
self.keywords_box.param.watch(self.hint_keyword, ['value'], onlychanged=True)
self.keywords_box.value=self.keywords_selected
if self.selected_data!=None:
select_value=pn.widgets.MultiChoice(name="value",options=options,value=self.selected_data,search_option_limit=50,max_items=1)
self.generate_bt.datavalue=self.selected_data
else:
select_value=pn.widgets.MultiChoice(name="value",options=options,search_option_limit=50,max_items=1)
self.generate_bt.datavalue=[]
if self.selected_categories!={}:
values=[i for i in self.selected_categories.keys()]
select_categories=pn.widgets.MultiChoice(name="categories",options=options,value=values,search_option_limit=50)
select_categories.watch_type="field"
select_categories.param.watch(self.categories_update, ['value'], onlychanged=True)
#self.generate_bt.categories=self.selected_categories
else:
select_categories=pn.widgets.MultiChoice(name="categories",options=options,search_option_limit=50)
select_categories.watch_type="field"
select_categories.param.watch(self.categories_update, ['value'], onlychanged=True)
#self.generate_bt.categories=[]
categories=pn.Column()
for f in self.selected_categories.keys():
cat_options=["any"]
cat_options.extend([i[1] for i in self.template_tree.get(f.split(':')[0],{}).get('downlinks',[])])
filter_value=pn.widgets.Select(name=f.split(':')[0]+":"+f.split(':')[1],options=cat_options,value=self.selected_categories[f])
#filter_value,_,_=self.choose_filter_widget(f.split(':')[0],f.split(':')[1],self.selected_categories[f])
filter_value.watch_type="value"
filter_value.field=f
filter_value.param.watch(self.categories_update, ['value'], onlychanged=True)
categories.append(filter_value)
if self.selected_filter!={}:
values=[i for i in self.selected_filter.keys()]
select_filters=pn.widgets.MultiChoice(name="filters",options=options,value=values,search_option_limit=50)
select_filters.watch_type="field"
select_filters.param.watch(self.filter_update, ['value'], onlychanged=True)
#self.generate_bt.filters=self.selected_filter
else:
select_filters=pn.widgets.MultiChoice(name="filters",options=options,search_option_limit=50)
select_filters.watch_type="field"
select_filters.param.watch(self.filter_update, ['value'], onlychanged=True)
#self.generate_bt.filters=[]
filters=pn.Column()
for f in self.selected_filter.keys():
filter_value,_,_=self.choose_filter_widget(f.split(':')[0],f.split(':')[1],self.selected_filter[f])
filter_value.watch_type="value"
filter_value.field=f
filter_value.param.watch(self.filter_update, ['value'], onlychanged=True)
filters.append(filter_value)
self.generate_bt.target=[startpoint]
select_value.link(self.generate_bt, value='datavalue')
#select_categories.link(self.generate_bt,value='categories')
#select_filters.link(self.generate_bt,value='filters')
self.generate_bt.on_click(self.generate_report)
report_items = ['Boxplot', 'Violin','Dataframe','LLM']
report_type=pn.widgets.Select(name="Report type", options=report_items,value='Boxplot')
report_type.link(self.generate_bt, value='report_type')
self.generate_bt.report_type='Boxplot'
close_bt=pn.widgets.Button(name="close panel",button_type="danger")
close_bt.window="self.workspace"
close_bt.floater="self.floatpanel_report"
close_bt.on_click(self.floater_close)
self.floatpanel_report.append(close_bt)
if event.new=='g':
if event.obj.target=='':
return
startpoint=event.obj.target
edges=self.evaluate_query("match (t {UID:'"+startpoint+"'})-[*]->(x:LNP) return collect(distinct x.UID)")
# We will collect a number of key parameters that define the experimental space
mrnas=self.evaluate_query("match (:Class {Name:'mRNA'})-->(c:mRNA)-->(x) where x.UID in ['"+"','".join(edges)+"'] return collect (distinct c.Name)")
ils=self.evaluate_query("match (:Class {Name:'IL'})-->(c:Compound)-->(x) where x.UID in ['"+"','".join(edges)+"'] return collect (distinct c.Name)")
hls=self.evaluate_query("match (:Class {Name:'helper lipids'})-->(c:Compound)-->(x) where x.UID in ['"+"','".join(edges)+"'] return collect (distinct c.Name)")
buffers=self.evaluate_query("match (:Class {Name:'buffers'})-->(c:Compound)-[:BUFFER]->(x) where x.UID in ['"+"','".join(edges)+"'] return collect (distinct c.Name)")
mixers=self.evaluate_query("match (:Class {Name:'buffers'})-->(c:Compound)-[:MIXING]->(x) where x.UID in ['"+"','".join(edges)+"'] return collect (distinct c.Name)")
ilratios=self.evaluate_query("match (:Class {Name:'IL'})-->(c:Compound)-[r]->(x) where x.UID in ['"+"','".join(edges)+"'] return collect (distinct r.Ratio)")
payloads=self.evaluate_query("match (:Class {Name:'mRNA'})-->(c:mRNA)-[r]->(x) where x.UID in ['"+"','".join(edges)+"'] return collect (distinct r.`Amount_[µg]`)")
tfrs=self.evaluate_query("match (e:Experiment)-[r]->(x) where x.UID in ['"+"','".join(edges)+"'] return collect (distinct r.TFR)")
frrs=self.evaluate_query("match (e:Experiment)-[r]->(x) where x.UID in ['"+"','".join(edges)+"'] return collect (distinct r.FRR)")
pkas=self.evaluate_query("match (x) where x.UID in ['"+"','".join(edges)+"'] return collect (distinct x.pKa)")
sizes=self.evaluate_query("match (x) where x.UID in ['"+"','".join(edges)+"'] return collect (distinct x.`Malvern_size_(d.nm)`)")
mrnas.sort()
ils.sort()
hls.sort()
buffers.sort()
mixers.sort()
ilratios.sort(key=self.safe_float)
payloads.sort(key=self.safe_float)
tfrs.sort(key=self.safe_float)
frrs.sort(key=self.safe_float)
pkas.sort(key=self.safe_float)
sizes.sort(key=self.safe_float)
self.expspace={}
self.expspace['ils']=[(i,'no') for i in ils]
self.expspace['mrnas']=[(i,'no') for i in mrnas]
self.expspace['hls']=[(i,'no') for i in hls]
self.expspace['buffers']=[(i,'no') for i in buffers]
self.expspace['mixers']=[(i,'no') for i in mixers]
self.expspace['ilratios']=[(i,'no') for i in ilratios]
self.expspace['payloads']=[(i,'no') for i in payloads]
self.expspace['tfrs']=[(i,'no') for i in tfrs]
self.expspace['frrs']=[(i,'no') for i in frrs]
self.expspace['pkas']=[(i,'no') for i in pkas]
self.expspace['sizes']=[(i,'no') for i in sizes]
start_name=self.evaluate_query("match (x {UID:'"+startpoint+"'}) return x.Name")
html_pane = pn.pane.HTML("<h1>Experimental space from "+start_name+"</h1>")
self.floatpanel_report.append(html_pane)
self.floatpanel_report.append(self.button_row_generate("mRNA","mrnas",startpoint))
self.floatpanel_report.append(self.button_row_generate("IL","ils",startpoint))
self.floatpanel_report.append(self.button_row_generate("Helper Lipid","hls",startpoint))
self.floatpanel_report.append(self.button_row_generate("Buffer","buffers",startpoint))
self.floatpanel_report.append(self.button_row_generate("Mixing","mixers",startpoint))
self.floatpanel_report.append(self.button_row_generate("IL Ratios","ilratios",startpoint))
self.floatpanel_report.append(self.button_row_generate("Payload","payloads",startpoint))
self.floatpanel_report.append(self.button_row_generate("TFR","tfrs",startpoint))
self.floatpanel_report.append(self.button_row_generate("FRR","frrs",startpoint))
self.floatpanel_report.append(self.button_row_generate("pKa obtained","pkas",startpoint))
self.floatpanel_report.append(self.button_row_generate("Malvern Sizes obtained","sizes",startpoint))
if event.new!='f' and event.new!='g':
self.floatpanel_report.append(plot)
elif event.new!='g':
self.floatpanel_report.append(pn.Column(select_value,select_categories,categories,select_filters,filters,pn.Row(self.keywords_box,self.keyword_option),self.query_box,report_type,self.generate_bt))
self.workspace.append(self.floatpanel_report)
return
def filter_update(self,event):
#print("processing filter change with ",event.new)
if event.obj.watch_type=="field":
for f in event.new:
if self.selected_filter.get(f,'')=='':
self.selected_filter[f]=''
else:
pass
loopkeys=[ i for i in self.selected_filter.keys()]
for k in loopkeys:
if not (k in event.new):
self.selected_filter.pop(k,None)
else:
field=event.obj.field
self.selected_filter[field]=event.new
# Now we need to adapt the choices for filters
filters=pn.Column()
for f in self.selected_filter.keys():
filter_value,_,_=self.choose_filter_widget(f.split(':')[0],f.split(':')[1],self.selected_filter[f])
filter_value.watch_type="value"
filter_value.field=f
filter_value.param.watch(self.filter_update, ['value'], onlychanged=True)
filters.append(filter_value)
self.floatpanel_report[-1][4]=filters
return
def filter_update_chat(self,event):
#print("processing filter change with ",event.new)
if event.obj.watch_type=="field":
for f in event.new:
if self.selected_filter.get(f,'')=='':
self.selected_filter[f]=''
else:
pass
loopkeys=[ i for i in self.selected_filter.keys()]
for k in loopkeys:
if not (k in event.new):
self.selected_filter.pop(k,None)
else:
field=event.obj.field
self.selected_filter[field]=event.new
# Now we need to adapt the choices for filters
filters=pn.Column()
for f in self.selected_filter.keys():
filter_value,_,_=self.choose_filter_widget(f.split(':')[0],f.split(':')[1],self.selected_filter[f])
filter_value.watch_type="value"
filter_value.field=f
filter_value.param.watch(self.filter_update_chat, ['value'], onlychanged=True)
filters.append(filter_value)
self.upnav[-1][0][4]=filters
return
def categories_update(self,event):
#print("processing filter change with ",event.new)
if event.obj.watch_type=="field":
for f in event.new:
if self.selected_categories.get(f,'')=='':
self.selected_categories[f]='any'
else:
pass
loopkeys=[ i for i in self.selected_categories.keys()]
for k in loopkeys:
if not (k in event.new):
self.selected_categories.pop(k,None)
else:
field=event.obj.field
self.selected_categories[field]=event.new
# Now we need to adapt the choices for filters
filters=pn.Column()
for f in self.selected_categories.keys():
options=["any"]
options.extend([i[1] for i in self.template_tree.get(f.split(':')[0],{}).get('downlinks',[])])
filter_value=pn.widgets.Select(name=f.split(':')[0]+":"+f.split(':')[1],options=options,value=self.selected_categories[f])
#filter_value,_,_=self.choose_filter_widget(f.split(':')[0],f.split(':')[1],self.selected_categories[f])
filter_value.watch_type="value"
filter_value.field=f
filter_value.param.watch(self.categories_update, ['value'], onlychanged=True)
filters.append(filter_value)
self.floatpanel_report[-1][2]=filters
return
def categories_update_chat(self,event):
#print("processing filter change with ",event.new)
if event.obj.watch_type=="field":
for f in event.new:
if self.selected_categories.get(f,'')=='':
self.selected_categories[f]='any'
else:
pass
loopkeys=[ i for i in self.selected_categories.keys()]
for k in loopkeys:
if not (k in event.new):
self.selected_categories.pop(k,None)
else:
field=event.obj.field
self.selected_categories[field]=event.new
# Now we need to adapt the choices for filters
filters=pn.Column()
for f in self.selected_categories.keys():
options=["any"]
options.extend([i[1] for i in self.template_tree.get(f.split(':')[0],{}).get('downlinks',[])])
filter_value=pn.widgets.Select(name=f.split(':')[0]+":"+f.split(':')[1],options=options,value=self.selected_categories[f])
#filter_value,_,_=self.choose_filter_widget(f.split(':')[0],f.split(':')[1],self.selected_categories[f])
filter_value.watch_type="value"
filter_value.field=f
filter_value.param.watch(self.categories_update_chat, ['value'], onlychanged=True)
filters.append(filter_value)
self.upnav[-2][0][2]=filters
return
def convert_data(self,data,datatype):
if datatype=='float':
if data==None:
data="0"
out=self.safe_float(data.replace(',','.'))
elif datatype=='int':
if data==None:
data="0"
out=self.safe_int(data.replace(',','.'))
else:
if data==None:
data=""
out=data
return out
def extend_tree(self,start,target,data_type):
uplinks=self.template_tree[start].get('uplinks',[])
hit=False
for u in uplinks:
if data_type=='link':
if u[1]==target:
hit=True
break
else:
if u[0]==target:
hit=True
break
if hit==True:
if data_type=='link':
return "()-[x:"+target+"]->"
else:
return "(x:"+target+")-->"
else:
out=""
for u in uplinks:
if not(u[0] in ['People','Customers','Projects','Equipment','Compounds','Configuration']):
#print("going out with ",u[0],target,data_type)
out=self.extend_tree(u[0],target,data_type)
if out!="":
out=out+"(:"+u[0]+")-->"
#print(out)
break
return out
def generate_sample_report(self,event):
try:
self.floatpanel_report.clear()
#print("clearing the panel")
except:
config = {"headerControls": {"maximize" : "remove","close" : "remove"}}
self.floatpanel_report=pn.layout.FloatPanel(name='Report', contained=False, margin=20,height=900,position="center",config=config)
self.selected_data=event.obj.datavalue
report_type='c'
startpoint=event.obj.target
#print("returned values ",self.selected_data,self.selected_categories,self.selected_filter)
try:
selected_data=self.selected_data[0]
except:
selected_data=self.selected_data
startpoint=event.obj.target
# Build a query string based on the inputs
# Step 1 - the core data
print("startpoint is ",startpoint)
self.core_data=[]
try:
data_type=json.loads(self.template_tree[selected_data.split(':')[0]]['data_fields'][selected_data.split(':')[1]])['type']
for x in self.node_report_fields:
if x[1]==selected_data:
data_source_type=x[0]
if data_source_type=="link":
for k in self.template_tree.keys():
if selected_data.split(':')[0] in [i[1] for i in self.template_tree[k].get('uplinks',[])]:
link_node=k
result=[]
for s in startpoint:
part_result=self.evaluate_query("match (t {UID:'"+s+"'})-[*]->(x:"+link_node+")<-[r:"+selected_data.split(':')[0]+"]-() \
return collect(distinct [x,r])")
result.extend(part_result)
#print("initial query with link field returned ",len(result))
for n in result:
filter=True
# Make sure user has access to this data item - x
target_keys=n[0].get('Keys','').split(',')
access_state=None
for k in target_keys:
if k in self.current_user.all_write_keys:
access_state="W"
if access_state==None:
for k in target_keys:
if k in self.current_user.all_read_keys:
access_state="R"
if access_state!=None:
for f in self.selected_filter.keys():
for x in self.node_report_fields:
if x[1]==f.split(':')[0]+":"+f.split(':')[1]:
data_source_type3=x[0]
#data_type_filter=json.loads(self.template_tree[f.split(':')[0]]['data_fields'][f.split(':')[1]])['type']
#print("entering loop with ",link_node,f.split(':')[0],data_source_type3)
out=self.extend_tree(link_node,f.split(':')[0],data_source_type3)
#print(out)
filter_result=self.evaluate_query("match "+out+"(t {UID:'"+n[0]['UID']+"'}) where x.`"+f.split(':')[1]+"`='"+str(self.selected_filter[f])+"' return x limit 1")
if filter_result==None:
filter=False
if filter:
self.core_data.append([self.convert_data(n[1][selected_data.split(':')[1]],data_type),n[0]['UID']])
else:
link_node=selected_data.split(':')[0]
result=[]
for s in startpoint:
part_result=self.evaluate_query("match (t {UID:'"+s+"'})-[*]->(x:"+link_node+") return collect(distinct x)")
result.extend(part_result)
#print("initial query with node field returned ",len(result))
for n in result:
filter=True
target_keys=n.get('Keys','').split(',')
access_state=None
for k in target_keys:
if k in self.current_user.all_write_keys:
access_state="W"
if access_state==None:
for k in target_keys:
if k in self.current_user.all_read_keys:
access_state="R"
if access_state!=None:
for f in self.selected_filter.keys():
for x in self.node_report_fields:
if x[1]==f.split(':')[0]+":"+f.split(':')[1]:
data_source_type3=x[0]
#data_type_filter_type=json.loads(self.template_tree[f.split(':')[0]]['data_fields'][f.split(':')[1]])['type']
#print("entering loop with ",link_node,f.split(':')[0],data_source_type3)
out=self.extend_tree(link_node,f.split(':')[0],data_source_type3)
#print(out)
filter_result=self.evaluate_query("match "+out+"(t {UID:'"+n[1]+"'}) where x.`"+f.split(':')[1]+"` contains '"+str(self.selected_filter[f])+"' return x limit 1")
if filter_result==None:
filter=False
if filter and not(n[selected_data.split(':')[1]] in ['','0','0.0']):
self.core_data.append([self.convert_data(n[selected_data.split(':')[1]],data_type),n['UID']])
#print("core data :",core_data)
# Step 2 - we assemble category data going up from the core data nodes
category_data=[]
for c in self.selected_categories.keys():
column=[]
for x in self.node_report_fields:
if x[1]==c:
data_source_type2=x[0]
data_type=json.loads(self.template_tree[c.split(':')[0]]['data_fields'][c.split(':')[1]])['type']
#print("entering loop with ",link_node,c.split(':')[0],data_source_type2)
out=self.extend_tree(link_node,c.split(':')[0],data_source_type2)
for n in self.core_data:
#print(n,"match "+out+"(t {UID:'"+n[1]+"}) return x.`"+c.split(':')[1]+"` limit 1")
if out=='':
if self.selected_categories[c]=="any":
result=self.evaluate_query("match (x {UID:'"+n[1]+"'}) return x.`"+c.split(':')[1]+"` limit 1")
else:
result=self.evaluate_query("match (x {UID:'"+n[1]+"'})-[:"+self.selected_categories[c]+"]->() return x.`"+c.split(':')[1]+"` limit 1")
else:
if self.selected_categories[c]=="any":
result=self.evaluate_query("match "+out+"(t {UID:'"+n[1]+"'}) return x.`"+c.split(':')[1]+"` limit 1")
else:
if data_source_type2=='node':
result=self.evaluate_query("match "+out+"(t {UID:'"+n[1]+"'}) where exists((x)-[:"+self.selected_categories[c]+"]->()) return x.`"+c.split(':')[1]+"` limit 1")
else:
result=self.evaluate_query("match "+out+"(t {UID:'"+n[1]+"'}) return x.`"+c.split(':')[1]+"` limit 1")
column.append(self.convert_data(result,data_type))
category_data.append(column)
#print(category_data)
total_data=[]
for i in range(len(self.core_data)):
append_part=[]
append_part.append(self.core_data[i][1])
append_part.append(self.core_data[i][0])
for j,c in enumerate(self.selected_categories.keys()):
append_part.append(category_data[j][i])
total_data.append(append_part)
data_columns=["UID","`"+selected_data+"`"]
self.plot_kdims=[]
for c in self.selected_categories.keys():
self.plot_kdims.append("`"+c+"`")
self.plot_vdims=["`"+selected_data+"`"]
data_columns.extend(self.plot_kdims)
except:
total_data=[]
data_columns=["UID"]
self.plot_vdims=[]
self.plot_kdims=[]
#print("total data" ,total_data)
self.report_df=pd.DataFrame(total_data,columns=data_columns)
plot=self.report_choice(report_type)
#plot=hv.BoxWhisker(data=self.report_df,kdims=self.plot_kdims,vdims=self.plot_vdims).opts(height=750, width=1000)
self.floatpanel_report.clear()
#self.workspace.pop(-1)
close_bt=pn.widgets.Button(name="close panel",button_type="success")
close_bt.window="self.workspace"
close_bt.floater="self.floatpanel_report"
close_bt.on_click(self.floater_close)
self.floatpanel_report.append(close_bt)
if self.selected_filter=={}:
html_pane = pn.pane.HTML("<h1>Report : data points :"+str(len(self.core_data))+"</h1>")
else:
filterstring=""
for k in self.selected_filter.keys():
filterstring = filterstring + " "+k+":"+self.selected_filter[k]
html_pane = pn.pane.HTML("<h1>Report : data points :"+str(len(self.core_data))+" Filters:"+filterstring+"</h1>")
self.floatpanel_report.append(html_pane)
self.floatpanel_report.append(plot)
self.workspace.append(self.floatpanel_report)
if total_data!=[]:
self.chat_with_internal.value=True
else:
self.chat_with_internal.value=False
return
def retrieve_internal_data(self):
self.core_data=[]
try:
self.selected_data=self.generate_bt.datavalue
startpoint=self.generate_bt.target
try:
selected_data=self.selected_data[0]
except:
selected_data=self.selected_data
data_type=json.loads(self.template_tree[selected_data.split(':')[0]]['data_fields'][selected_data.split(':')[1]])['type']
for x in self.node_report_fields:
if x[1]==selected_data:
data_source_type=x[0]
if data_source_type=="link":
for k in self.template_tree.keys():
if selected_data.split(':')[0] in [i[1] for i in self.template_tree[k].get('uplinks',[])]:
link_node=k
result=[]
for s in startpoint:
part_result=self.evaluate_query("match (t {UID:'"+s+"'})-[*]->(x:"+link_node+")<-[r:"+selected_data.split(':')[0]+"]-() \
return collect(distinct [x,r])")
result.extend(part_result)
#print("initial query with link field returned ",len(result))
for n in result:
filter=True
# Make sure user has access to this data item - x
target_keys=n[0].get('Keys','').split(',')
access_state=None
for k in target_keys:
if k in self.current_user.all_write_keys:
access_state="W"
if access_state==None:
for k in target_keys:
if k in self.current_user.all_read_keys:
access_state="R"
if access_state!=None:
for f in self.selected_filter.keys():
for x in self.node_report_fields:
if x[1]==f.split(':')[0]+":"+f.split(':')[1]:
data_source_type3=x[0]
#data_type_filter=json.loads(self.template_tree[f.split(':')[0]]['data_fields'][f.split(':')[1]])['type']
#print("entering loop with ",link_node,f.split(':')[0],data_source_type3)
out=self.extend_tree(link_node,f.split(':')[0],data_source_type3)
#print(out)
filter_result=self.evaluate_query("match "+out+"(t {UID:'"+n[0]['UID']+"'}) where x.`"+f.split(':')[1]+"`='"+str(self.selected_filter[f])+"' return x limit 1")
if filter_result==None:
filter=False
if filter:
self.core_data.append([self.convert_data(n[1][selected_data.split(':')[1]],data_type),n[0]['UID']])
else:
link_node=selected_data.split(':')[0]
result=[]
for s in startpoint:
part_result=self.evaluate_query("match (t {UID:'"+s+"'})-[*]->(x:"+link_node+") return collect(distinct x)")
result.extend(part_result)
#print("initial query with node field returned ",len(result))
for n in result:
filter=True
target_keys=n.get('Keys','').split(',')
access_state=None
for k in target_keys:
if k in self.current_user.all_write_keys:
access_state="W"
if access_state==None:
for k in target_keys:
if k in self.current_user.all_read_keys:
access_state="R"
if access_state!=None:
for f in self.selected_filter.keys():
for x in self.node_report_fields:
if x[1]==f.split(':')[0]+":"+f.split(':')[1]:
data_source_type3=x[0]
#data_type_filter_type=json.loads(self.template_tree[f.split(':')[0]]['data_fields'][f.split(':')[1]])['type']
#print("entering loop with ",link_node,f.split(':')[0],data_source_type3)
out=self.extend_tree(link_node,f.split(':')[0],data_source_type3)
#print(out)
filter_result=self.evaluate_query("match "+out+"(t {UID:'"+n[1]+"'}) where x.`"+f.split(':')[1]+"` contains '"+str(self.selected_filter[f])+"' return x limit 1")
if filter_result==None:
filter=False
if filter and not(n[selected_data.split(':')[1]] in ['','0','0.0']):
self.core_data.append([self.convert_data(n[selected_data.split(':')[1]],data_type),n['UID']])
#print("core data :",core_data)
# Step 2 - we assemble category data going up from the core data nodes
category_data=[]
for c in self.selected_categories.keys():
column=[]
for x in self.node_report_fields:
if x[1]==c:
data_source_type2=x[0]
data_type=json.loads(self.template_tree[c.split(':')[0]]['data_fields'][c.split(':')[1]])['type']
#print("entering loop with ",link_node,c.split(':')[0],data_source_type2)
out=self.extend_tree(link_node,c.split(':')[0],data_source_type2)
for n in self.core_data:
#print(n,"match "+out+"(t {UID:'"+n[1]+"}) return x.`"+c.split(':')[1]+"` limit 1")
if out=='':
if self.selected_categories[c]=="any":
result=self.evaluate_query("match (x {UID:'"+n[1]+"'}) return x.`"+c.split(':')[1]+"` limit 1")
else:
result=self.evaluate_query("match (x {UID:'"+n[1]+"'})-[:"+self.selected_categories[c]+"]->() return x.`"+c.split(':')[1]+"` limit 1")
else:
if self.selected_categories[c]=="any":
result=self.evaluate_query("match "+out+"(t {UID:'"+n[1]+"'}) return x.`"+c.split(':')[1]+"` limit 1")
else:
if data_source_type2=='node':
result=self.evaluate_query("match "+out+"(t {UID:'"+n[1]+"'}) where exists((x)-[:"+self.selected_categories[c]+"]->()) return x.`"+c.split(':')[1]+"` limit 1")
else:
result=self.evaluate_query("match "+out+"(t {UID:'"+n[1]+"'}) return x.`"+c.split(':')[1]+"` limit 1")
column.append(self.convert_data(result,data_type))
category_data.append(column)
#print(category_data)
total_data=[]
for i in range(len(self.core_data)):
append_part=[]
append_part.append(self.core_data[i][1])
append_part.append(self.core_data[i][0])
for j,c in enumerate(self.selected_categories.keys()):
append_part.append(category_data[j][i])
total_data.append(append_part)
data_columns=["UID","`"+selected_data+"`"]
self.plot_kdims=[]
for c in self.selected_categories.keys():
self.plot_kdims.append("`"+c+"`")
self.plot_vdims=["`"+selected_data+"`"]
data_columns.extend(self.plot_kdims)
except:
total_data=[]
data_columns=["UID"]
self.plot_vdims=[]
self.plot_kdims=[]
#print("total data" ,total_data)
self.report_df=pd.DataFrame(total_data,columns=data_columns)
return
def return_selected_keywords(self):
self.keywords_selected=self.keywords_box.value
keyword_search=self.keywords_selected.split(",")
keyword=[]
for k in keyword_search:
keyword.extend(self.evaluate_query("match (x:Keyword) where toUpper(x.Name) contains '"+k.upper()+"' return collect(x.Name)") )
keyword=sorted(list(set(keyword)))
syns=self.evaluate_query("match (k:Keyword)-[:SYN]-(x:Keyword) where k.Name in ['"+"','".join(keyword)+"'] return collect(k.Name)")
syns.extend(keyword)
return syns
def retrieve_lit_chunks(self):
self.keywords_selected=self.keywords_box.value
keyword_search=self.keywords_selected.split(",")
keyword=[]
for k in keyword_search:
keyword.extend(self.evaluate_query("match (x:Keyword) where toUpper(x.Name) contains '"+k.upper()+"' return collect(x.Name)") )
keyword=sorted(list(set(keyword)))
syns=self.evaluate_query("match (k:Keyword)-[:SYN]-(x:Keyword) where k.Name in ['"+"','".join(keyword)+"'] return collect(k.Name)")
syns.extend(keyword)
print("number of syn keywords ",len(syns))
text_meta=self.evaluate_query("match (y:Publication)-->(x:Text_chunk)<-[:REFERS]-(k:Keyword) where k.Name in ['"+"','".join(syns)+"'] return collect(['Text',y.Name,x.Text])")
#text_meta=[]
print("block found ",len(text_meta))
text_meta.extend(self.evaluate_query("match (y:Publication)-->(x:Table_chunk)<-[:REFERS]-(k:Keyword) where k.Name in ['"+"','".join(syns)+"'] return collect(['Table',y.Name,x.Text])") )
text_meta=[x[0]+"/x/"+x[1]+"/x/"+x[2] for x in text_meta]
print("block found including tables ",len(text_meta))
text_meta=list(set(text_meta))
text_blocks=[x.split('/x/')[2] for x in text_meta]
metadata=[{'Type':x.split('/x/')[0],'Source':x.split('/x/')[1]} for x in text_meta]
docs=[Document(page_content=block) for block in text_blocks]
return docs
def generate_report(self,event):
self.selected_data=event.obj.datavalue
#self.selected_categories=event.obj.categories
report_type='a'
if event.obj.report_type=='Boxplot':
report_type='a'
if event.obj.report_type=='Violin':
report_type='b'
if event.obj.report_type=='Dataframe':
report_type='c'
if event.obj.report_type=='LLM':
report_type='d'
#self.selected_filter=event.obj.filters
startpoint=event.obj.target
#print("returned values ",self.selected_data,self.selected_categories,self.selected_filter)
try:
selected_data=self.selected_data[0]
except:
selected_data=self.selected_data
self.llm_query=self.query_box.value
self.keywords_selected=self.keywords_box.value
#print(self.node_report_fields)
# Build a query string based on the inputs
# Step 1 - the core data
self.core_data=[]
try:
data_type=json.loads(self.template_tree[selected_data.split(':')[0]]['data_fields'][selected_data.split(':')[1]])['type']
for x in self.node_report_fields:
if x[1]==selected_data:
data_source_type=x[0]
if data_source_type=="link":
for k in self.template_tree.keys():
if selected_data.split(':')[0] in [i[1] for i in self.template_tree[k].get('uplinks',[])]:
link_node=k
result=[]
for s in startpoint:
part_result=self.evaluate_query("match (t {UID:'"+s+"'})-[*]->(x:"+link_node+")<-[r:"+selected_data.split(':')[0]+"]-() \
return collect(distinct [x,r])")
result.extend(part_result)
#print("initial query with link field returned ",len(result))
for n in result:
filter=True
# Make sure user has access to this data item - x
target_keys=n[0].get('Keys','').split(',')
access_state=None
for k in target_keys:
if k in self.current_user.all_write_keys:
access_state="W"
if access_state==None:
for k in target_keys:
if k in self.current_user.all_read_keys:
access_state="R"
if access_state!=None:
for f in self.selected_filter.keys():
for x in self.node_report_fields:
if x[1]==f.split(':')[0]+":"+f.split(':')[1]:
data_source_type3=x[0]
#data_type_filter=json.loads(self.template_tree[f.split(':')[0]]['data_fields'][f.split(':')[1]])['type']
#print("entering loop with ",link_node,f.split(':')[0],data_source_type3)
out=self.extend_tree(link_node,f.split(':')[0],data_source_type3)
#print(out)
filter_result=self.evaluate_query("match "+out+"(t {UID:'"+n[0]['UID']+"'}) where x.`"+f.split(':')[1]+"`='"+str(self.selected_filter[f])+"' return x limit 1")
if filter_result==None:
filter=False
if filter:
self.core_data.append([self.convert_data(n[1][selected_data.split(':')[1]],data_type),n[0]['UID']])
else:
link_node=selected_data.split(':')[0]
result=[]
for s in startpoint:
part_result=self.evaluate_query("match (t {UID:'"+s+"'})-[*]->(x:"+link_node+") return collect(distinct x)")
result.extend(part_result)
#print("initial query with node field returned ",len(result))
for n in result:
filter=True
target_keys=n.get('Keys','').split(',')
access_state=None
for k in target_keys:
if k in self.current_user.all_write_keys:
access_state="W"
if access_state==None:
for k in target_keys:
if k in self.current_user.all_read_keys:
access_state="R"
if access_state!=None:
for f in self.selected_filter.keys():
for x in self.node_report_fields:
if x[1]==f.split(':')[0]+":"+f.split(':')[1]:
data_source_type3=x[0]
#data_type_filter_type=json.loads(self.template_tree[f.split(':')[0]]['data_fields'][f.split(':')[1]])['type']
#print("entering loop with ",link_node,f.split(':')[0],data_source_type3)
out=self.extend_tree(link_node,f.split(':')[0],data_source_type3)
#print(out)
filter_result=self.evaluate_query("match "+out+"(t {UID:'"+n['UID']+"'}) where x.`"+f.split(':')[1]+"` contains '"+str(self.selected_filter[f])+"' return x limit 1")
if filter_result==None:
filter=False
if filter and not(n[selected_data.split(':')[1]] in ['','0','0.0']):
self.core_data.append([self.convert_data(n[selected_data.split(':')[1]],data_type),n['UID']])
#print("core data :",core_data)
# Step 2 - we assemble category data going up from the core data nodes
category_data=[]
for c in self.selected_categories.keys():
column=[]
for x in self.node_report_fields:
if x[1]==c:
data_source_type2=x[0]
data_type=json.loads(self.template_tree[c.split(':')[0]]['data_fields'][c.split(':')[1]])['type']
#print("entering loop with ",link_node,c.split(':')[0],data_source_type2)
out=self.extend_tree(link_node,c.split(':')[0],data_source_type2)
for n in self.core_data:
#print(n,"match "+out+"(t {UID:'"+n[1]+"}) return x.`"+c.split(':')[1]+"` limit 1")
if out=='':
if self.selected_categories[c]=="any":
result=self.evaluate_query("match (x {UID:'"+n[1]+"'}) return x.`"+c.split(':')[1]+"` limit 1")
else:
result=self.evaluate_query("match (x {UID:'"+n[1]+"'})-[:"+self.selected_categories[c]+"]->() return x.`"+c.split(':')[1]+"` limit 1")
else:
if self.selected_categories[c]=="any":
result=self.evaluate_query("match "+out+"(t {UID:'"+n[1]+"'}) return x.`"+c.split(':')[1]+"` limit 1")
else:
if data_source_type2=='node':
result=self.evaluate_query("match "+out+"(t {UID:'"+n[1]+"'}) where exists((x)-[:"+self.selected_categories[c]+"]->()) return x.`"+c.split(':')[1]+"` limit 1")
else:
result=self.evaluate_query("match "+out+"(t {UID:'"+n[1]+"'}) return x.`"+c.split(':')[1]+"` limit 1")
column.append(self.convert_data(result,data_type))
category_data.append(column)
#print(category_data)
total_data=[]
for i in range(len(self.core_data)):
append_part=[]
append_part.append(self.core_data[i][1])
append_part.append(self.core_data[i][0])
for j,c in enumerate(self.selected_categories.keys()):
append_part.append(category_data[j][i])
total_data.append(append_part)
data_columns=["UID","`"+selected_data+"`"]
self.plot_kdims=[]
for c in self.selected_categories.keys():
self.plot_kdims.append("`"+c+"`")
self.plot_vdims=["`"+selected_data+"`"]
data_columns.extend(self.plot_kdims)
except:
total_data=[]
data_columns=["UID"]
self.plot_vdims=[]
self.plot_kdims=[]
#print("total data" ,total_data)
self.report_df=pd.DataFrame(total_data,columns=data_columns)
plot=self.report_choice(report_type)
#plot=hv.BoxWhisker(data=self.report_df,kdims=self.plot_kdims,vdims=self.plot_vdims).opts(height=750, width=1000)
self.floatpanel_report.clear()
self.workspace.pop(-1)
close_bt=pn.widgets.Button(name="close panel",button_type="success")
close_bt.window="self.workspace"
close_bt.floater="self.floatpanel_report"
close_bt.on_click(self.floater_close)
self.floatpanel_report.append(close_bt)
if self.selected_filter=={}:
html_pane = pn.pane.HTML("<h1>Report : data points :"+str(len(self.core_data))+"</h1>")
else:
filterstring=""
for k in self.selected_filter.keys():
filterstring = filterstring + " "+k+":"+self.selected_filter[k]
html_pane = pn.pane.HTML("<h1>Report : data points :"+str(len(self.core_data))+" Filters:"+filterstring+"</h1>")
self.floatpanel_report.append(html_pane)
report_items = [('Boxplot', 'a'), ('Violin', 'b'),('Dataframe','c')]
change_report_type=pn.widgets.MenuButton(name="Report type", items=report_items, button_type='primary')
change_report_type.target=startpoint
change_report_type.on_click(self.adapt_report)
self.floatpanel_report.append(change_report_type)
self.floatpanel_report.append(plot)
self.workspace.append(self.floatpanel_report)
return
def report_choice(self,report_type):
#print("requested report :",report_type)
if len(self.plot_vdims)>1:
report_type='c'
if report_type=="a":
plot=hv.BoxWhisker(data=self.report_df,kdims=self.plot_kdims,vdims=self.plot_vdims).opts(height=750, width=1000)
if report_type=="b":
plot=hv.Violin(data=self.report_df,kdims=self.plot_kdims,vdims=self.plot_vdims).opts(height=750, width=1000)
if report_type=="c":
#tabulator_filters = {
# 'Name':{'placeholder':'Enter Study Name'}
# }
tabulator_filters={i:{'placeholder':"Enter"+str(i)} for i in self.report_df.columns[1:]}
#print("tabulator filters ",tabulator_filters)
plot = pn.widgets.Tabulator(self.report_df, layout='fit_columns', theme='fast', align='center', sizing_mode='stretch_height',
show_index=False, disabled=True, selectable=False, width=1000,
hidden_columns=['UID'],
header_filters = tabulator_filters, pagination='remote',page_size=20)
def go_to_object(e):
row = plot.value.loc[e.row]
self.workspace.pop(-1)
handback_statement="self.create_viewbox(row['UID'])"
parameter=handback_statement
del self.floatpanel_report
eval(parameter,{"self":self,"row":row})
plot.on_click(go_to_object)
if report_type=="d":
os.environ["OPENAI_API_KEY"]='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'
# We introduce 3 scenarios : only internal data (no keywords), only external data (no self.report) or a combo of both.
if self.keywords_selected=="":
scenario="int_only"
else:
if self.report_df.size==0:
scenario="ext_only"
else:
scenario="hybrid"
if scenario=="ext_only" or scenario=="hybrid":
keyword_search=self.keywords_selected.split(",")
keyword=[]
for k in keyword_search:
keyword.extend(self.evaluate_query("match (x:Keyword) where toUpper(x.Name) contains '"+k.upper()+"' return collect(x.Name)") )
keyword=sorted(list(set(keyword)))
print(keyword)
else:
keyword=[]
query_template="./"+scenario+"_prompt.txt"
with open(query_template, 'r') as file:
query = file.read()
#keyword=["MC3","SM-102"]
# query="""
# System: You are an expert in vaccine development and formulation design and highly scientifically accurate. You are presented with data coming from internal sources (following the mark 'Internal:') and with context data extracted from
# scientific articles. Your anwer should start with a summary (using summary tables when possible) of internal data. If the internal data is presented as chemical formulas in SMILES format you should try to find the corresponding chemical names and properties and report those in your answer. Use them to compare it to other chemical data in the external sources.
# The second part should be a summary (using summary tables when possible) of external context data.
# The third section should present a detailed comparison between the findings from internal versus external data.
# The last part is your conclusion that should also indicate on which points internal data agrees or disagrees with external data.
# Your full answer should be in strict Markdown format and should not be preceded with the indication 'markdown'. Make sure to wrap your text and use line breaks where needed.
# Internal data (in markdown format):
# """
if scenario=="hybrid" or scenario=="int_only":
query = query + str(self.report_df.drop(columns=['UID']).to_markdown(index=False))
if self.llm_query=="":
query = query + """ \n
User : Please summarize how the incorporation of Fluc mRNA influences the LNP particle size. Are there specific forms of Fluc mRNA that have more influence on size than others. Be precise on those differences.
"""
else:
query = query + """ \n
User : """ + self.llm_query
if scenario=="hybrid" or scenario=="ext_only":
syns=self.evaluate_query("match (k:Keyword)-[:SYN]-(x:Keyword) where k.Name in ['"+"','".join(keyword)+"'] return collect(k.Name)")
syns.extend(keyword)
print("number of syn keywords ",len(syns))
text_meta=self.evaluate_query("match (y:Publication)-->(x:Text_chunk)<-[:REFERS]-(k:Keyword) where k.Name in ['"+"','".join(syns)+"'] return collect(['Text',y.Name,x.Text])")
#text_meta=[]
print("block found ",len(text_meta))
text_meta.extend(self.evaluate_query("match (y:Publication)-->(x:Table_chunk)<-[:REFERS]-(k:Keyword) where k.Name in ['"+"','".join(syns)+"'] return collect(['Table',y.Name,x.Text])") )
text_meta=[x[0]+"/x/"+x[1]+"/x/"+x[2] for x in text_meta]
print("block found including tables ",len(text_meta))
text_meta=list(set(text_meta))
text_blocks=[x.split('/x/')[2] for x in text_meta]
metadata=[{'Type':x.split('/x/')[0],'Source':x.split('/x/')[1]} for x in text_meta]
print("block found without duplicates ",len(text_blocks))
else:
print("running without ext context")
text_blocks=[]
metadata=[]
#result=run_llm(text_blocks,metadata,query,k=50,temp=0)
result=""
#print("result :",result)
if scenario=="hybrid" or scenario=="ext_only":
source_docs = result.get('source_documents', [])
source_doc_table=[[doc.metadata['Source'],doc.page_content] for doc in source_docs]
df = pd.DataFrame(source_doc_table,columns=['source','text'])
plot=pn.Column("Query : "+self.llm_query,pn.pane.Markdown(result['result']),pn.widgets.Tabulator(df,name="References",hidden_columns=['index']))
else:
plot=pn.Column("Query : "+self.llm_query,pn.pane.Markdown(result.content))
## insert code that generates LLM analysis
return plot
def adapt_report(self,event):
startpoint=event.obj.target
plot=self.report_choice(event.new)
self.floatpanel_report.clear()
self.workspace.pop(-1)
close_bt=pn.widgets.Button(name="close panel",button_type="success")
close_bt.window="self.workspace"
close_bt.floater="self.floatpanel_report"
close_bt.on_click(self.floater_close)
self.floatpanel_report.append(close_bt)
if self.selected_filter=={}:
html_pane = pn.pane.HTML("<h1>Report : data points :"+str(len(self.core_data))+"</h1>")
else:
filterstring=""
for k in self.selected_filter.keys():
filterstring = filterstring + " "+k+":"+self.selected_filter[k]
html_pane = pn.pane.HTML("<h1>Report : data points :"+str(len(self.core_data))+" Filters:"+filterstring+"</h1>")
self.floatpanel_report.append(html_pane)
report_items = [('Boxplot', 'a'), ('Violin', 'b'),('Dataframe','c')]
change_report_type=pn.widgets.MenuButton(name="Report type", items=report_items, button_type='primary')
change_report_type.target=startpoint
change_report_type.on_click(self.adapt_report)
self.floatpanel_report.append(change_report_type)
self.floatpanel_report.append(plot)
self.workspace.append(self.floatpanel_report)
return
def display_search(self,event=None,filter="",handback=None,field="x.Name"):
try:
self.floatpanel_search.clear()
#print("clearing the panel")
except:
config = {"headerControls": {"maximize" : "remove","close" : "remove"}}
self.floatpanel_search=pn.layout.FloatPanel(name='Search', contained=False, margin=20,height=800,position="center",config=config)
if event!=None:
filter_statement=event.obj.filter
if filter_statement!="":
filter_statement=filter_statement +"and "+field+" is not null"
else:
filter_statement=" where "+field+" is not null"
handback_statement=event.obj.handback
else:
if filter!="":
filter_statement=filter +"and "+field+" is not null"
else:
filter_statement=" where "+field+" is not null"
if handback!=None:
handback_statement=handback
else:
handback_statement="self.create_viewbox(row['UID'])"
print(filter_statement,handback_statement)
initial_set=self.evaluate_query("match (x) "+filter_statement+" return collect([x.UID,"+field+"])")
print("returned length ",len(initial_set))
if initial_set!=[]:
df=pd.DataFrame(initial_set,columns=['UID',field])
else:
df=pd.DataFrame({'UID':[],field:[]})
tabulator_filters = {
field:{'placeholder':'Enter '+field+' Name'}
}
study_overview_wd = pn.widgets.Tabulator(df, layout='fit_columns', theme='fast', align='center', sizing_mode='stretch_height',
show_index=False, disabled=True, selectable=False, width=1000,
hidden_columns=['UID'],
header_filters = tabulator_filters, pagination='remote',page_size=20,stylesheets=['./assets/stylesheets/tabulator.css'])
def go_to_object(e):
row = study_overview_wd.value.loc[e.row]
self.workspace.pop(-1)
parameter=handback_statement
del self.floatpanel_search
eval(parameter,{"self":self,"row":row})
study_overview_wd.on_click(go_to_object)
close_bt=pn.widgets.Button(name="close panel",button_type="success")
close_bt.window="self.workspace"
close_bt.floater="self.floatpanel_search"
close_bt.on_click(self.floater_close)
self.floatpanel_search.append(close_bt)
self.floatpanel_search.append(study_overview_wd)
self.workspace.append(self.floatpanel_search)
return
def open_json_editor(self,event):
try:
self.floatpanel_json.clear()
#print("clearing the panel")
except:
config = {"headerControls": {"maximize" : "remove"}}
self.floatpanel_json=pn.layout.FloatPanel(name='Field format editor', contained=False, margin=20,height=600,position="center",config=config)
self.floatpanel_json.target_position=event.obj.position
self.floatpanel_json.direction=event.obj.direction
if event.obj.direction=='up':
inputvalue=self.upnav[0][self.safe_int(self.floatpanel_json.target_position[0])][1][self.safe_int(self.floatpanel_json.target_position[1])][0].value
if event.obj.direction=='down':
inputvalue=self.downnav[0][self.safe_int(self.floatpanel_json.target_position[0])][1][self.safe_int(self.floatpanel_json.target_position[1])][0].value
if event.obj.direction=='middle':
inputvalue=self.workspace[1][self.safe_int(self.floatpanel_json.target_position)][0].value
if inputvalue=='':
inputvalue='{"type":"text"}'
#print("inputvalue ",inputvalue)
type_dict=json.loads(inputvalue)
close_bt=pn.widgets.Button(name="close panel",button_type="success")
close_bt.window="self.workspace"
close_bt.floater="self.floatpanel_json"
close_bt.on_click(self.floater_close)
self.floatpanel_json.append(close_bt)
#print("json dict",type_dict)
html_pane = pn.pane.HTML("<h1> Field format editor </h1>")
self.floatpanel_json.append(html_pane)
value=type_dict.get('type','text')
w=pn.widgets.Select(name='data type',options=['text','int','float','date','datetime','radio','check','multichoice','password'],value=value)
self.floatpanel_json.append(w)
value=type_dict.get('options','')
w=pn.widgets.TextInput(name='options',value=value)
self.floatpanel_json.append(w)
value=type_dict.get('lower','')
w=pn.widgets.TextInput(name='lower range',value=value)
self.floatpanel_json.append(w)
value=type_dict.get('upper','')
w=pn.widgets.TextInput(name='upper range',value=value)
self.floatpanel_json.append(w)
value=type_dict.get('tab','General')
w=pn.widgets.TextInput(name='display tab',value=value)
self.floatpanel_json.append(w)
b=pn.widgets.Button(name="Save",button_type="primary")
b.on_click(self.save_json_editor)
self.floatpanel_json.append(b)
self.workspace.append(self.floatpanel_json)
#print("completed float panel")
return
def save_json_editor(self,event):
type_dict={}
type_dict['type']=self.floatpanel_json[2].value
type_dict['options']=self.floatpanel_json[3].value
type_dict['lower']=self.floatpanel_json[4].value
type_dict['upper']=self.floatpanel_json[5].value
type_dict['tab']=self.floatpanel_json[6].value
if self.floatpanel_json.direction=="up":
self.upnav[0][self.safe_int(self.floatpanel_json.target_position[0])][1][self.safe_int(self.floatpanel_json.target_position[1])][0].value=json.dumps(type_dict)
if self.floatpanel_json.direction=="down":
self.downnav[0][self.safe_int(self.floatpanel_json.target_position[0])][1][self.safe_int(self.floatpanel_json.target_position[1])][0].value=json.dumps(type_dict)
if self.floatpanel_json.direction=="middle":
self.workspace[1][self.floatpanel_json.target_position][0].value=json.dumps(type_dict)
self.workspace.pop(-1)
del self.floatpanel_json
return
def show_similars(self,event):
try:
self.floatpanel_chem.clear()
self.workspace.pop(-1)
#print("clearing the panel")
except:
config = {"headerControls": {"maximize" : "remove","close" : "remove"}}
self.floatpanel_chem=pn.layout.FloatPanel(name='Chem Display', contained=False, margin=20,height=600,position="center",config=config)
close_bt=pn.widgets.Button(name="close panel",button_type="success")
close_bt.window="self.workspace"
close_bt.floater="self.floatpanel_chem"
close_bt.on_click(self.floater_close)
self.floatpanel_chem.append(close_bt)
ILS=self.evaluate_query("match (c:Compound)<--(:Class {Name:'IL'}) where c.SMILES is not null and c.Name <> '"+str(event.obj.target['Name'])+"' return collect([c.Name,c.Pub_name,c.SMILES,c.UID])")
smiles=[x[2] for x in ILS]
mols=[Chem.MolFromSmiles(x) for x in smiles]
fpgen = AllChem.GetRDKitFPGenerator()
fps = [fpgen.GetFingerprint(x) for x in mols]
ref_finger=fpgen.GetFingerprint(Chem.MolFromSmiles(event.obj.target['SMILES']))
l=len(fps)
simmatrix=np.zeros((l,2),dtype='float')
for i in range(l):
simmatrix[i,0]=DataStructs.FingerprintSimilarity(ref_finger,fps[i])
simmatrix[i,1]=i
closest=simmatrix[simmatrix[:,0]>float(event.obj.sim_value)]
output=pn.Column()
img = Draw.MolToImage(Chem.MolFromSmiles(event.obj.target['SMILES']),size=(600,600))
myIO=BytesIO()
img.save(myIO,format="PNG")
output.append(pn.Row(pn.Column("Reference",str(event.obj.target['Name']),str(event.obj.target['Pub_name'])),pn.pane.PNG(myIO)))
handback_statement="self.create_viewbox(UID)"
def go_to_object(e):
UID=e.obj.target
self.workspace.pop(-1)
parameter=handback_statement
del self.floatpanel_chem
#print("handback ",handback_statement,UID)
eval(parameter,{"self":self,"UID":UID})
for c in closest:
img = Draw.MolToImage(mols[int(c[1])],size=(600,600))
myIO=BytesIO()
img.save(myIO,format="PNG")
go_bt=pn.widgets.Button(name='GO',button_type='success')
go_bt.target=ILS[int(c[1])][3]
go_bt.on_click(go_to_object)
svg="""<svg xmlns="http://www.w3.org/2000/svg" width="24" height="24" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round"
stroke-linejoin="round" class="icon icon-tabler icons-tabler-outline icon-tabler-shopping-cart-plus"><path stroke="none" d="M0 0h24v24H0z"
fill="none"/><path d="M4 19a2 2 0 1 0 4 0a2 2 0 0 0 -4 0" /><path d="M12.5 17h-6.5v-14h-2" /><path d="M6 5l14 1l-.86 6.017m-2.64 .983h-10.5" /><path d="M16 19h6" /><path d="M19 16v6" /></svg>"""
cart_bt = pn.widgets.ButtonIcon(icon=svg,active_icon=svg, styles={'background':'lightgreen'}, size="4em")
cart_bt.target=ILS[int(c[1])][3]
cart_bt.on_click(self.add_to_cart)
output.append(pn.Row(pn.Column(ILS[int(c[1])][0],ILS[int(c[1])][1],"Tanamoto "+str(simmatrix[int(c[1]),0]),go_bt,cart_bt),pn.pane.PNG(myIO)))
self.floatpanel_chem.append(output)
sim_cut=pn.widgets.FloatInput(name="similarity cutoff",value=float(event.obj.sim_value))
show_sim=pn.widgets.Button(name="Show sims",button_type='primary')
show_sim.sim_value=0.75
show_sim.target=event.obj.target
show_sim.on_click(self.show_similars)
sim_cut.link(show_sim, value='sim_value')
self.floatpanel_chem.append(sim_cut)
self.floatpanel_chem.append(show_sim)
self.workspace.append(self.floatpanel_chem)
return
def show_chemical(self,event):
try:
self.floatpanel_chem.clear()
#print("clearing the panel")
except:
config = {"headerControls": {"maximize" : "remove","close" : "remove"}}
self.floatpanel_chem=pn.layout.FloatPanel(name='Chem Display', contained=False, margin=20,height=600,position="center",config=config)
m=Chem.MolFromSmiles(event.obj.target['SMILES'])
img=Draw.MolToImage(m,size=(600,600))
myIO=BytesIO()
img.save(myIO,format="PNG")
close_bt=pn.widgets.Button(name="close panel",button_type="success")
close_bt.window="self.workspace"
close_bt.floater="self.floatpanel_chem"
close_bt.on_click(self.floater_close)
self.floatpanel_chem.append(close_bt)
self.floatpanel_chem.append(pn.pane.PNG(myIO))
sim_cut=pn.widgets.FloatInput(name="similarity cutoff",value=0.75)
show_sim=pn.widgets.Button(name="Show sims",button_type='primary')
show_sim.sim_value=0.75
show_sim.target=event.obj.target
show_sim.on_click(self.show_similars)
sim_cut.link(show_sim, value='sim_value')
self.floatpanel_chem.append(sim_cut)
self.floatpanel_chem.append(show_sim)
self.workspace.append(self.floatpanel_chem)
return
def add_to_cart(self,event):
if isinstance(event.obj.target,str):
target=self.evaluate_query("match (x {UID:'"+event.obj.target+"'}) return x")
else:
target=event.obj.target
add_df=pd.DataFrame(columns=['UID','Name','Type'])
add_df.loc[999]=[str(target['UID']),str(target['Name']),str(list(target.labels)[0])]
self.shopping_cart=pd.concat([self.shopping_cart,add_df])
self.shopping_cart.index=range(len(self.shopping_cart))
#self.shopping_cart.loc[len(self.shopping_cart)]=[str(event.obj.target['UID']),str(event.obj.target['Name']),str(list(event.obj.target.labels)[0])]
g_text="match (u:User {UID:'"+self.current_user.UID+"'}) set u.cart='"+self.shopping_cart.to_json()+"'"
result=self.run_query(g_text)
return
def update_anchor_point(self,UID):
self.anchor_search_bt.UID=UID
self.anchor_point.value=self.evaluate_query("match (x {UID:'"+UID+"'}) return labels(x)[0]+'-'+x.Name")
return
def update_process_anchor_change(self,event):
self.left_initial=False
self.left_chat_update()
return
def left_chat_update(self):
if self.left_initial:
self.anchor_point_selector=pn.Column()
self.anchor_point=pn.widgets.TextInput(name="Anchor point")
self.anchor_point.param.watch(self.update_process_anchor_change, ['value'], onlychanged=True)
self.anchor_search_bt=pn.widgets.Button(name="Search Anchor point",button_type="success")
self.anchor_search_bt.on_click(self.display_search)
self.anchor_search_bt.filter=""
self.anchor_search_bt.handback="self.update_anchor_point(row['UID'])"
self.anchor_search_bt.UID=""
self.internal_data_set=pn.Column()
self.anchor_point_selector.append(pn.Row(self.anchor_point,self.anchor_search_bt))
self.upnav.clear()
html_pane = pn.pane.HTML("<h1> Internal data </h1>")
self.upnav.append(html_pane)
#self.upnav.append(self.chat_with_internal)
#self.upnav.append(self.anchor_point_selector)
#self.upnav.append(self.internal_data_set)
self.upnav.append(self.chat_int_stores)
else:
startpoint=self.anchor_search_bt.UID
print("injected startpoint = ",startpoint)
connectome=self.find_edges(startpoint)
print("connectome build")
self.node_report_fields=[]
relationships_in_scope=[]
for n in connectome:
nodedata=self.template_tree.get(n,'')
if nodedata!='':
fields=nodedata.get('data_fields',{})
if fields!={}:
for f in fields.keys():
if f!='Keys':
self.node_report_fields.append(['node',n+":"+f])
#Get relevant relationships
relationships_in_scope.extend([i[1] for i in nodedata.get('uplinks')])
relationships_in_scope.extend([i[1] for i in nodedata.get('downlinks')])
for r in set(relationships_in_scope):
nodedata=self.template_tree.get(r,'')
if nodedata!='':
fields=nodedata.get('data_fields',{})
if fields!={}:
for f in fields.keys():
if f!='Keys':
self.node_report_fields.append(['link',r+":"+f])
print("scope build")
self.node_report_fields.sort(key=self.takesecondkey)
#print("prestored values ",self.selected_data,self.selected_categories,self.selected_filter)
options=[i[1] for i in self.node_report_fields]
self.generate_bt=pn.widgets.Button(name="Sample report",button_type="primary")
self.generate_bt.target=[startpoint]
#key_options=sorted(self.graph.run("match (x:Keyword) where not 'Template' in labels(x) return collect (distinct x.Name)") )
#self.keywords_box=pn.widgets.TextInput(name="Keywords",value='')
#self.keyword_option=pn.widgets.Select(name="keyword hints",value='',options=[''])
#self.query_box=pn.widgets.TextAreaInput(name="Query",value=self.llm_query,auto_grow=True,resizable='both',max_rows=10, rows=6)
#self.keyword_option.param.watch(self.push_keyword, ['value'], onlychanged=True)
#self.keywords_box.param.watch(self.hint_keyword, ['value'], onlychanged=True)
#self.keywords_box.value=self.keywords_selected
if self.selected_data!=None:
select_value=pn.widgets.MultiChoice(name="value",options=options,value=self.selected_data,search_option_limit=50,max_items=1)
self.generate_bt.datavalue=self.selected_data
else:
select_value=pn.widgets.MultiChoice(name="value",options=options,search_option_limit=50,max_items=1)
self.generate_bt.datavalue=[]
if self.selected_categories!={}:
values=[i for i in self.selected_categories.keys()]
select_categories=pn.widgets.MultiChoice(name="categories",options=options,value=values,search_option_limit=50)
select_categories.watch_type="field"
select_categories.param.watch(self.categories_update_chat, ['value'], onlychanged=True)
self.generate_bt.categories=self.selected_categories
else:
select_categories=pn.widgets.MultiChoice(name="categories",options=options,search_option_limit=50)
select_categories.watch_type="field"
select_categories.param.watch(self.categories_update_chat, ['value'], onlychanged=True)
self.generate_bt.categories=[]
categories=pn.Column()
for f in self.selected_categories.keys():
cat_options=["any"]
cat_options.extend([i[1] for i in self.template_tree.get(f.split(':')[0],{}).get('downlinks',[])])
filter_value=pn.widgets.Select(name=f.split(':')[0]+":"+f.split(':')[1],options=cat_options,value=self.selected_categories[f])
#filter_value,_,_=self.choose_filter_widget(f.split(':')[0],f.split(':')[1],self.selected_categories[f])
filter_value.watch_type="value"
filter_value.field=f
filter_value.param.watch(self.categories_update_chat, ['value'], onlychanged=True)
categories.append(filter_value)
if self.selected_filter!={}:
values=[i for i in self.selected_filter.keys()]
select_filters=pn.widgets.MultiChoice(name="filters",options=options,value=values,search_option_limit=50)
select_filters.watch_type="field"
select_filters.param.watch(self.filter_update_chat, ['value'], onlychanged=True)
self.generate_bt.filters=self.selected_filter
else:
select_filters=pn.widgets.MultiChoice(name="filters",options=options,search_option_limit=50)
select_filters.watch_type="field"
select_filters.param.watch(self.filter_update_chat, ['value'], onlychanged=True)
self.generate_bt.filters=[]
filters=pn.Column()
for f in self.selected_filter.keys():
filter_value,_,_=self.choose_filter_widget(f.split(':')[0],f.split(':')[1],self.selected_filter[f])
filter_value.watch_type="value"
filter_value.field=f
filter_value.param.watch(self.filter_update_chat, ['value'], onlychanged=True)
filters.append(filter_value)
self.internal_data_set.clear()
self.generate_bt.report_type='Dataframe'
self.generate_bt.on_click(self.generate_sample_report)
self.internal_data_set.append(pn.Column(select_value,select_categories,categories,select_filters,filters,self.generate_bt))
self.upnav.clear()
html_pane = pn.pane.HTML("<h1> Internal data </h1>")
self.upnav.append(html_pane)
# self.upnav.append(self.chat_with_internal)
# self.upnav.append(self.anchor_point_selector)
# self.upnav.append(self.internal_data_set)
self.upnav.append(self.chat_int_stores)
#self.generate_bt.target=[startpoint]
select_value.link(self.generate_bt, value='datavalue')
select_categories.link(self.generate_bt,value='categories')
select_filters.link(self.generate_bt,value='filters')
#report_items = ['Boxplot', 'Violin','Dataframe','LLM']
#report_type=pn.widgets.Select(name="Report type", options=report_items,value='Boxplot')
#report_type.link(self.generate_bt, value='report_type')
#self.generate_bt.report_type='Boxplot'
#close_bt=pn.widgets.Button(name="close panel",button_type="danger")
#close_bt.window="self.workspace"
#close_bt.floater="self.floatpanel_report"
#close_bt.on_click(self.floater_close)
#self.floatpanel_report.append(close_bt)
return
def right_chat_update(self):
self.downnav.clear()
html_pane = pn.pane.HTML("<h1> Literature data and external settings </h1>")
self.downnav.append(html_pane)
self.downnav.append(self.chat_with_external)
self.keywords_box=pn.widgets.TextInput(name="Keywords",value='',width=200)
self.keyword_option=pn.widgets.Select(name="keyword hints",value='',options=[''])
self.keyword_option.param.watch(self.push_keyword, ['value'], onlychanged=True)
self.keywords_box.param.watch(self.hint_keyword, ['value'], onlychanged=True)
self.keywords_box.value=self.keywords_selected
self.downnav.append(pn.Row(self.keywords_box,self.keyword_option))
self.downnav.append(self.chat_context_items)
self.downnav.append(self.chat_with_web)
self.downnav.append(self.chat_with_history)
# Add extensive search controls
self.downnav.append(pn.pane.HTML("<h3>Extensive Search Settings</h3>"))
self.downnav.append(self.chat_extensive_search)
self.downnav.append(self.chat_extensive_chunks)
self.downnav.append(self.chat_summary_tokens)
self.downnav.append(self.chat_detail_level)
# Add keyword filtering controls
self.downnav.append(pn.pane.HTML("<h3>Filtering Settings</h3>"))
self.downnav.append(self.chat_keyword_filtering)
self.downnav.append(self.chat_manual_keywords)
# Add reference relevance filtering controls
self.downnav.append(pn.pane.HTML("<h3>Reference Filtering</h3>"))
self.downnav.append(self.chat_reference_filtering)
self.downnav.append(self.chat_relevance_threshold)
# Add instruction template controls
self.downnav.append(pn.pane.HTML("<h3>Response Instructions</h3>"))
self.downnav.append(self.chat_instruction_template)
self.downnav.append(self.chat_detailed_instructions)
self.downnav.append(pn.Row(self.chat_instruction_name, self.chat_save_instructions, self.chat_load_instructions))
return
def create_chatbox(self):
self.loading_indicator.name="Loading"
self.loading_indicator.value=True
html_pane = pn.pane.HTML("<h1> Ask me anything </h1>")
self.workspace.append(html_pane)
## I want to insert code here that creates a chat interface in panel that allows the user to ask questions and get answers
## The chat interface should be able to handle both simple questions and more complex queries that require a LLM model to be run
## The chat interface should be able to handle both internal and external data sources
self.chatbox=OneCo_hybrid_RAG()
self.chat_int_stores.options=self.chatbox.available_collections
# Update instruction template options with loaded templates
template_options = list(self.chatbox.instruction_templates.keys())
self.chat_instruction_template.options = template_options
self.workspace.append(self.chatbox.chat_interface)
self.left_initial=True
self.right_initial=True
self.left_chat_update()
self.right_chat_update()
self.loading_indicator.name="Done"
self.loading_indicator.value=False
return
def create_viewbox(self, UID):
"""
Creates a viewbox for the given UID.
This method performs the following steps:
1. Sets a loading indicator.
2. Retrieves the target node from the graph database using the provided UID.
3. Determines the access state (read or write) based on the current user's keys.
4. If the user has no access, displays an access denied message.
5. If the target node is a template, creates an editable view with appropriate widgets.
6. If the target node is not a template, creates a detailed view with widgets and buttons for interaction.
7. Adds options for adding new fields and removing nodes if the user has write access.
8. Updates navigation elements based on the target node's labels.
9. Resets the loading indicator.
Parameters:
UID (str): The unique identifier of the target node.
Returns:
None
"""
self.loading_indicator.name="Loading"
self.loading_indicator.value=True
print('UID' , UID)
target = self.evaluate_query(f"MATCH (o) WHERE o.UID = '{UID}' RETURN o")
target_keys=target.get('Keys','').split(',')
access_state=None
for k in target_keys:
if k in self.current_user.all_write_keys:
access_state="W"
if access_state==None:
for k in target_keys:
if k in self.current_user.all_read_keys:
access_state="R"
#print("access state ",access_state)
if access_state==None:
self.workspace.clear()
editor=pn.Column()
title="Sorry - you have no access to this item"
html_pane = pn.pane.HTML("<h1>"+title+"</h1>")
editor.append(html_pane)
self.workspace.append(editor)
return
if 'Template' in target.labels:
self.workspace.clear()
html_pane = pn.pane.HTML("<h1> Main view </h1>")
self.workspace.append(html_pane)
fields=list(target.keys())
fields.remove('UID')
fields.sort()
fields.insert(0,'UID')
try:
fields.remove('Keys')
except:
pass
#self.editor_map={}
self.watcher_collector=[]
editor=pn.Column()
title=str(target.labels)
html_pane = pn.pane.HTML("<h1>"+title+"</h1>")
editor.append(html_pane)
edit_position=1
for f in fields:
if f in ['UID','RO']:
w=pn.widgets.TextInput(name=f,value=target[f],disabled=True)
w.field=f
w.target=target
w.modus='text'
w.position=edit_position
edit_position=edit_position+1
#self.editor_map[f]=[w,target[f],False]
#self.watcher_collector.append(w.param.watch(self.save_object, ['value'], onlychanged=True))
#w.param.watch(self.save_object, ['value'], onlychanged=True)
editor.append(w)
else:
if access_state=='W':
w=pn.widgets.TextInput(name=f,value=target[f],disabled=False)
else:
w=pn.widgets.TextInput(name=f,value=target[f],disabled=True)
w.field=f
w.target=target
w.modus='text'
w.position=edit_position
if access_state=='W':
b=pn.widgets.Button(name="Edit",button_type="success")
b.position=edit_position
b.direction="middle"
b.on_click(self.open_json_editor)
d=pn.widgets.Button(name="DEL",button_type="danger")
d.field=f
d.target=target
d.direction="middle"
d.on_click(self.delete_field)
self.watcher_collector.append(w.param.watch(self.save_object, ['value'], onlychanged=True))
editor.append(pn.Row(w,b,d))
else:
editor.append(w)
edit_position=edit_position+1
#print("you clicked a template")
self.workspace.append(editor)
else:
self.workspace.clear()
html_pane = pn.pane.HTML("<h1> Main view </h1>")
self.workspace.append(html_pane)
fields=list(target.keys())
fields.remove('UID')
fields.sort()
fields.insert(0,'UID')
try:
fields.remove('Keys')
except:
pass
editor=pn.Column()
self.watcher_collector=[]
title=list(target.labels)[0]+"-"+target.get('Name','')
html_pane = pn.pane.HTML("<h1>"+title+"</h1>")
svg="""<svg xmlns="http://www.w3.org/2000/svg" width="24" height="24" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round"
stroke-linejoin="round" class="icon icon-tabler icons-tabler-outline icon-tabler-shopping-cart-plus"><path stroke="none" d="M0 0h24v24H0z"
fill="none"/><path d="M4 19a2 2 0 1 0 4 0a2 2 0 0 0 -4 0" /><path d="M12.5 17h-6.5v-14h-2" /><path d="M6 5l14 1l-.86 6.017m-2.64 .983h-10.5" /><path d="M16 19h6" /><path d="M19 16v6" /></svg>"""
bt = pn.widgets.ButtonIcon(icon=svg,active_icon=svg, styles={'background':'lightgreen'}, size="4em")
bt.target=target
bt.on_click(self.add_to_cart)
editor.append(pn.Row(html_pane,bt))
#We find out which tabbs are requested by the template - if any
label=list(target.labels)
try:
label.remove('Template')
except:
pass
tablabels=[]
template=self.template_tree[label[0]]
if template!=None:
fieldvalues=template['data_fields'].values()
#print("fieldvalues", fieldvalues, "from", template['data_fields'])
for i in fieldvalues:
tablabels.append(json.loads(i).get('tab','General'))
tablabels={x:[] for x in sorted(set(tablabels))}
if tablabels=={}:
tablabels={'General':[]}
else:
tablabels={'General':[]}
for f in fields:
w,modus,free_field,tab=self.choose_widget(target,f,access_state)
w.field=f
w.target=target
w.modus=modus
self.watcher_collector.append(w.param.watch(self.save_object, ['value'], onlychanged=True))
if free_field and access_state=='W':
d=pn.widgets.Button(name="DEL",button_type="danger")
d.field=f
d.target=target
d.direction="middle"
d.on_click(self.delete_field)
if f=='SMILES':
c=pn.widgets.Button(name="CHEM",button_type="primary")
c.field=f
c.target=target
c.on_click(self.show_chemical)
tablabels[tab].append(pn.Row(w,c,d))
else:
tablabels[tab].append(pn.Row(w,d))
else:
if f=='SMILES':
c=pn.widgets.Button(name="CHEM",button_type="primary")
c.field=f
c.target=target
c.on_click(self.show_chemical)
tablabels[tab].append(pn.Row(w,c))
else:
tablabels[tab].append(w)
if len(tablabels)==1:
inner_element=pn.Column()
for x in tablabels['General']:
inner_element.append(x)
editor.append(inner_element)
else:
inner_element=pn.Accordion(toggle=True,css_classes=['CPath-accordion1'])
for k in tablabels.keys():
accordion_card=pn.Column()
for x in tablabels[k]:
accordion_card.append(x)
inner_element.append((k,accordion_card))
editor.append(inner_element)
self.workspace.append(editor)
if access_state=='W':
bt = pn.widgets.Button(name='Add new field', button_type='primary',styles={'background':'lightgrey'}, width=300)
add_label = pn.widgets.TextInput(name='New field',value='')
add_value = pn.widgets.TextInput(name='New value',value='')
add_label.link(bt, value='New_label')
add_value.link(bt, value='New_value')
bt.New_label=''
bt.New_value=''
bt.target = target
bt.UID=UID
bt.on_click(self.add_new_field)
newcol = pn.Column(add_label, add_value,bt,styles={'background':'lightgrey'})
self.workspace.append(newcol)
# Node removal - only end of the tree allowed
downlinks=self.evaluate_query("match (x {UID:'"+target['UID']+"'})-->(y) return collect(y.UID)")
if downlinks==[]:
bt = pn.widgets.Button(name='Remove node', button_type='danger',styles={'background':'red'}, width=300)
bt.on_click(self.remove_node)
bt.target = target
bt.UID=UID
self.workspace.append(bt)
#report_items = [('Size - Violin', 'a'), ('Size - boxplot', 'b'), None, ('pKa - boxplot', 'c'),None,('Zeta - boxplot', 'd'),('Size/ratio - boxplot', 'e'),('Generator','f'),('ExpSpace','g')]
#report_items = [('Generator','f'),('ExpSpace','g')]
#report_button = pn.widgets.MenuButton(name='Report', items=report_items, button_type='primary')
#bt = pn.widgets.Button(name='violin report', button_type='primary',styles={'background':'lightgreen'}, width=300)
#bt.report_type='violin'
self.report_button.target=target['UID']
#report_button.on_click(self.display_report)
#self.workspace.append(report_button)
if not('Template' in target.labels):
label=list(target.labels)
try:
label.remove('Template')
except:
pass
self.searchlimitbt.name='Search within class '+label[0]
self.searchlimitbt.filter="WHERE '"+label[0]+"' in labels(x) "
# bt = pn.widgets.Button(name='Search within class '+label[0], button_type='primary',styles={'background':'lightgreen'}, width=300)
# bt.filter="WHERE '"+label[0]+"' in labels(x) "
# bt.handback="self.create_viewbox(row['UID'])"
# bt.on_click(self.display_search)
# self.workspace.append(bt)
# bt = pn.widgets.Button(name='Search global', button_type='primary',styles={'background':'lightgreen'}, width=300)
# bt.filter=""
# bt.handback="self.create_viewbox(row['UID'])"
# bt.on_click(self.display_search)
# self.workspace.append(bt)
if not("Configuration" in target.labels):
self.update_upnav(target)
self.update_downnav(target)
self.loading_indicator.name="Idle..."
self.loading_indicator.value=False
return
def floater_close(self,event):
floater=event.obj.floater
window=event.obj.window
eval(window+".pop(-1)")
#eval("del "+floater)
return
def go_to_new(self,event):
self.create_viewbox(event.obj.UID)
try:
self.navigation_tree.selected=event.obj.UID
except:
pass
return
def save_relationship(self,event):
#print("saving relationship")
target=event.obj.target
name=event.obj.relation_name
# Put in some logic for format conversion upon save - based on template
if "Template" in list( self.get_start_node(target).labels):
out=self.run_query("match (x {UID:'"+ self.get_start_node(target)['UID']+"'})-[r]->(y {UID:'"+ self.get_end_node(target)['UID']+"'}) SET r.`"+event.obj.field+"`='"+str(event.new)+"'")
return
label=list(target.type)
template=self.template_tree[label[0]]
if template!=None:
# Do we have a value for this field?
#print("loading field ",event.obj.field)
field_type=template['data_fields'][event.obj.field]
if field_type==None:
field_type='{"type":"text"}'
field_dict=json.loads(field_type)
modus=field_dict['type']
for case in switch(modus):
if case('text','int','float'):
out=self.run_query("match (x {UID:'"+ self.get_start_node(target)['UID']+"'})-[r]->(y {UID:'"+ self.get_end_node(target)['UID']+"'}) SET r.`"+event.obj.field+"`='"+str(event.new)+"'")
return
if case('date'):
out=self.run_query("match (x {UID:'"+ self.get_start_node(target)['UID']+"'})-[r]->(y {UID:'"+ self.get_end_node(target)['UID']+"'}) SET r.`"+event.obj.field+"`='"+str(dt.date.isoformat(event.new))+"'")
return
if case('datetime'):
out=self.run_query("match (x {UID:'"+ self.get_start_node(target)['UID']+"'})-[r]->(y {UID:'"+ self.get_end_node(target)['UID']+"'}) SET r.`"+event.obj.field+"`='"+str(dt.datetime.isoformat(event.new))+"'")
return
if case('check'):
if event.new:
value='yes'
else:
value='no'
#self.push_changes(target)
return
if case('multichoice'):
#target[event.obj.field]=",".join(event.new)
out=self.run_query("match (x {UID:'"+self.get_start_node(target)['UID']+"'})-[r]->(y {UID:'"+ self.get_end_node(target)['UID']+"'}) SET r.`"+event.obj.field+"`='"+value+"'")
return
else:
out=self.run_query("match (x {UID:'"+self.get_start_node(target)['UID']+"'})-[r]->(y {UID:'"+ self.get_end_node(target)['UID']+"'}) SET r.`"+event.obj.field+"`='"+str(event.new)+"'")
return
def add_new_relation_field(self,event):
#print("adding field")
target=event.obj.target
field=event.obj.New_label
name=event.obj.relation_name
if field!='':
value=event.obj.New_value
out=self.run_query("match (x {UID:'"+self.get_start_node(target)['UID']+"'})-[r:"+name+"]->(y {UID:'"+ self.get_end_node(target)['UID']+"'}) SET r.`"+field+"`='"+value+"'")
if event.obj.direction=="up":
UID= self.get_end_node(target)
self.update_upnav(UID)
else:
UID=self.get_start_node(target)
self.update_downnav(UID)
return
def remove_relation(self,event):
target=event.obj.target
name=event.obj.relation_name
out=self.run_query("match (x {UID:'"+self.get_start_node(target)['UID']+"'})-[r:"+name+"]->(y {UID:'"+ self.get_end_node(target)['UID']+"'}) delete r")
self.update_access_tokens(self.get_start_node(target)['UID'])
self.update_access_tokens( self.get_end_node(target)['UID'])
if event.obj.direction=="up":
UID= self.get_end_node(target)
self.update_upnav(UID)
else:
UID=self.get_start_node(target)
self.update_downnav(UID)
#self.create_carrousel(self.top_UID)
return
def get_start_node(self, relationship):
"""
Retrieve the start node of a Neo4j relationship from the database.
Parameters
----------
relationship : neo4j.graph.Relationship
The relationship object from which to get the start node
Returns
-------
neo4j.graph.Node
The complete start node object with all properties
Notes
-----
This method is designed for Neo4j 5.x+ with the official driver,
using elementId() for node identification.
"""
try:
# Get the start node element_id from the relationship
start_node_id = relationship.start_node.element_id
# Query to fetch the complete node data using elementId
query = "MATCH (n) WHERE elementId(n) = $element_id RETURN n"
result = self.run_query(query, {"element_id": start_node_id})
# Get the first record (should be only one)
record = result.single()
if record:
return record['n']
else:
print(f"No node found with element_id {start_node_id}")
return None
except AttributeError:
# Fall back to checking if the relationship itself contains the node data
if hasattr(relationship, 'nodes') and len(relationship.nodes) >= 1:
return relationship.nodes[0]
else:
print("Could not access start node element_id")
return None
except Exception as e:
print(f"Error retrieving start node: {e}")
return None
def get_end_node(self, relationship):
"""
Retrieve the end node of a Neo4j relationship from the database.
Parameters
----------
relationship : neo4j.graph.Relationship
The relationship object from which to get the end node
Returns
-------
neo4j.graph.Node
The complete end node object with all properties
Notes
-----
This method is designed for Neo4j 5.x+ with the official driver,
using elementId() for node identification.
"""
try:
# Get the end node element_id from the relationship
end_node_id = relationship.end_node.element_id
# Query to fetch the complete node data using elementId
query = "MATCH (n) WHERE elementId(n) = $element_id RETURN n"
result = self.run_query(query, {"element_id": end_node_id})
# Get the first record (should be only one)
record = result.single()
if record:
return record['n']
else:
print(f"No node found with element_id {end_node_id}")
return None
except AttributeError:
# Fall back to checking if the relationship itself contains the node data
if hasattr(relationship, 'nodes') and len(relationship.nodes) >= 2:
return relationship.nodes[1]
else:
print("Could not access end node element_id")
return None
except Exception as e:
print(f"Error retrieving end node: {e}")
return None
def delete_relation_field(self,event):
target=event.obj.target
name=event.obj.relation_name
out=self.run_query("match (x {UID:'"+self.get_start_node(target)['UID']+"'})-[r:"+name+"]->(y {UID:'"+ self.get_end_node(target)['UID']+"'}) remove r.`"+event.obj.field+"`")
if event.obj.direction=="up":
UID= self.get_end_node(target)
self.update_upnav(UID)
else:
UID= self.get_start_node(target)
self.update_downnav(UID)
#self.create_carrousel(self.top_UID)
return
def take_startnode_name(self,elem):
return self.get_start_node(elem).get('Name','zzzz')
def take_endnode_name(self,elem):
return self.get_end_node(elem).get('Name','zzzz')
def update_upnav(self,target):
upward_links=self.evaluate_query("match (x)-[y]->(t {UID:'"+target['UID']+"'}) return collect(y)")
upward_links.sort(key=self.take_startnode_name)
# Does the target specify grouping?
grouping=self.evaluate_query("match (t {UID:'"+target['UID']+"'}) return t.Subfamily")
if grouping !=None:
grouping=sorted(grouping.split(','))
else:
grouping=None
target_keys=target.get('Keys','').split(',')
access_state=None
for k in target_keys:
if k in self.current_user.all_write_keys:
access_state="W"
if access_state==None:
for k in target_keys:
if k in self.current_user.all_read_keys:
access_state="R"
if access_state==None:
self.upnav.clear()
editor=pn.Column()
title="Sorry - you have no access to this item"
html_pane = pn.pane.HTML("<h1>"+title+"</h1>")
editor.append(html_pane)
self.upnav.append(editor)
return
#upward_links=[i[1] for i in upward_links]
#print("upward ",upward_links)
self.upnav.clear()
html_pane = pn.pane.HTML("<h1> Parent objects </h1>")
self.upnav.append(html_pane)
tab_items=[]
accordion_position=0
for l in upward_links:
label=list(self.get_start_node(l).labels)
if "Template" in label:
name=''
else:
name=self.get_start_node(l).get('Name','')
try:
label.remove('Template')
except:
pass
target_keys=self.get_start_node(l).get('Keys','').split(',')
up_access_state=None
for k in target_keys:
if k in self.current_user.all_write_keys:
up_access_state="W"
if up_access_state==None:
for k in target_keys:
if k in self.current_user.all_read_keys:
up_access_state="R"
if access_state=='W' and up_access_state=='W':
local_access_state='W'
else:
local_access_state='R'
#print("label :",label,l, type(l))
if label!=[] and up_access_state!=None:
keys=list(l.keys())
go_bt=pn.widgets.Button(name='Go',button_type='primary')
go_bt.UID=self.get_start_node(l)['UID']
go_bt.on_click(self.go_to_new)
fields=list(l.keys())
accordion_card=pn.Column()
accordion_card.append(go_bt)
editor=pn.Column()
self.watcher_collector_up=[]
html_pane = pn.pane.HTML("<h1>"+str(l.type)+"</h1>")
relation_name=str(l.type)
editor.append(html_pane)
if 'Template' in target.labels:
edit_position=1
for f in fields:
if local_access_state=='W':
w=pn.widgets.TextInput(name=f,value=l[f],disabled=False)
else:
w=pn.widgets.TextInput(name=f,value=l[f],disabled=True)
w.field=f
w.target=l
w.modus='text'
w.position=[accordion_position,edit_position]
if local_access_state=='W':
b=pn.widgets.Button(name="Edit",button_type="success")
b.position=[accordion_position,edit_position]
b.on_click(self.open_json_editor)
b.direction="up"
b.relation_name=relation_name
d=pn.widgets.Button(name="DEL",button_type="danger")
d.field=f
d.target=l
d.direction="up"
d.relation_name=relation_name
d.on_click(self.delete_relation_field)
edit_position=edit_position+1
#self.editor_map[f]=[w,target[f],False]
self.watcher_collector_up.append(w.param.watch(self.save_relationship, ['value'], onlychanged=True))
#w.param.watch(self.save_object, ['value'], onlychanged=True)
if local_access_state=='W':
editor.append(pn.Row(w,b,d))
else:
editor.append(w)
else:
for f in fields:
w,modus,free_field,tab=self.choose_widget(l,f,local_access_state)
w.field=f
w.target=l
w.modus=modus
w.relation_name=relation_name
self.watcher_collector_up.append(w.param.watch(self.save_relationship, ['value'], onlychanged=True))
if free_field and local_access_state=='W':
d=pn.widgets.Button(name="DEL",button_type="danger")
d.field=f
d.target=l
d.direction="up"
d.relation_name=relation_name
d.on_click(self.delete_relation_field)
editor.append(pn.Row(w,d))
else:
editor.append(w)
accordion_card.append(editor)
if local_access_state=='W':
bt = pn.widgets.Button(name='Add new field', button_type='primary',styles={'background':'lightgrey'}, width=300)
add_label = pn.widgets.TextInput(name='New field',value='')
add_value = pn.widgets.TextInput(name='New value',value='')
add_label.link(bt, value='New_label')
add_value.link(bt, value='New_value')
bt.New_label=''
bt.New_value=''
bt.target = l
bt.relation_name=relation_name
bt.on_click(self.add_new_relation_field)
bt.direction="up"
newcol = pn.Column(add_label, add_value,bt,styles={'background':'lightgrey'})
accordion_card.append(newcol)
# Relationship removal - dumb implemntation so far
bt = pn.widgets.Button(name='Remove relation', button_type='danger',styles={'background':'red'}, width=300)
bt.on_click(self.remove_relation)
bt.target = l
bt.direction="up"
bt.relation_name=relation_name
accordion_card.append(bt)
tab_items.append([label[0]+":"+name+"-"+l.type.upper()+"->",accordion_card])
accordion_position=accordion_position+1
if tab_items!=[]:
expander=pn.Accordion(toggle=True,css_classes=['CPath-accordion1'])
for tab in tab_items:
expander.append((tab[0],tab[1]))
if up_access_state=='W':
if 'Template' in target.labels:
label=list(target.labels)
try:
label.remove('Template')
except:
pass
add_node=pn.widgets.TextInput(name='New link type',value='')
bt = pn.widgets.Button(name='Add new link', button_type='primary',styles={'background':'lightgreen'}, width=300)
bt.New_rel=''
bt.target = target
def make_callstring(e):
if e.obj.New_rel!='':
filter="WHERE 'Template' in labels(x) "
handback="self.link_to_object(row['UID'],'"+e.obj.target['UID']+"','"+e.obj.New_rel+"','up',True)"
self.display_search(None,filter,handback)
bt.on_click(make_callstring)
add_node.link(bt, value='New_rel')
newcol = pn.Column(add_node,bt,styles={'background':'lightgreen'})
self.upnav.append(newcol)
else:
label=list(target.labels)
try:
label.remove('Template')
except:
pass
template=self.template_tree[label[0]]
if template!=None:
# Do we have a value for this field?
uplinks=template['uplinks']
#print("uplinks :",uplinks)
if uplinks!=[]:
options=[]
for l in uplinks:
options.append(l[1]+":"+l[0])
add_node=pn.widgets.Select(name='New link type',options=options,value=options[0])
bt = pn.widgets.Button(name='Add new link', button_type='primary',styles={'background':'lightgreen'}, width=300)
bt.New_rel=options[0]
bt.target = target
def make_callstring(e):
filter="WHERE '"+e.obj.New_rel.split(':')[1]+"' in labels(x) "
handback="self.link_to_object(row['UID'],'"+e.obj.target['UID']+"','"+e.obj.New_rel.split(':')[0]+"','up')"
self.display_search(None,filter,handback)
bt.on_click(make_callstring)
add_node.link(bt, value='New_rel')
newcol = pn.Column(add_node,bt,styles={'background':'lightgreen'})
self.upnav.append(newcol)
self.upnav.append(expander)
#Adding a box to create new links
return
def update_downnav(self,target):
downward_links=self.evaluate_query("match (x)<-[y]-(t {UID:'"+target['UID']+"'}) return collect(y) ")
downward_links.sort(key=self.take_endnode_name)
grouping=self.evaluate_query("match (t {UID:'"+target['UID']+"'}) return t.Subfamily")
if grouping !=None:
grouping=sorted(grouping.split(','))
else:
grouping=None
#print("downward ",downward_links)
target_keys=target.get('Keys','').split(',')
access_state=None
for k in target_keys:
if k in self.current_user.all_write_keys:
access_state="W"
if access_state==None:
for k in target_keys:
if k in self.current_user.all_read_keys:
access_state="R"
if access_state==None:
self.downnav.clear()
editor=pn.Column()
title="Sorry - you have no access to this item"
html_pane = pn.pane.HTML("<h1>"+title+"</h1>")
editor.append(html_pane)
self.downnav.append(editor)
return
print("downwards access state ",access_state)
self.downnav.clear()
html_pane = pn.pane.HTML("<h1> Child objects </h1>")
self.downnav.append(html_pane)
if access_state=='W':
if 'Template' in target.labels:
label=list(target.labels)
try:
label.remove('Template')
except:
pass
add_node=pn.widgets.TextInput(name='New link type',value='')
bt = pn.widgets.Button(name='Add new link', button_type='primary',styles={'background':'lightgreen'}, width=300)
bt.New_rel=''
bt.target = target
def make_callstring(e):
if e.obj.New_rel!='':
filter="WHERE 'Template' in labels(x) "
handback="self.link_to_object('"+e.obj.target['UID']+"',row['UID'],'"+e.obj.New_rel+"','down',True)"
self.display_search(None,filter,handback,"labels(x)")
bt.on_click(make_callstring)
add_node.link(bt, value='New_rel')
newcol = pn.Column(add_node,bt,styles={'background':'lightgreen'})
self.downnav.append(newcol)
else:
label=list(target.labels)
try:
label.remove('Template')
except:
pass
template=self.template_tree[label[0]]
if template!=None:
# Do we have a value for this field?
downlinks=template['downlinks']
#print("uplinks :",downlinks)
if downlinks!=[]:
options=[]
for l in downlinks:
options.append(l[1]+":"+l[0])
add_node=pn.widgets.Select(name='New link type',options=options,value=options[0])
bt = pn.widgets.Button(name='Add new link', button_type='primary',styles={'background':'lightgreen'}, width=300)
bt.New_rel=options[0]
bt.target = target
def make_callstring(e):
filter="WHERE '"+e.obj.New_rel.split(':')[1]+"' in labels(x) "
handback="self.link_to_object('"+e.obj.target['UID']+"',row['UID'],'"+e.obj.New_rel.split(':')[0]+"','down')"
self.display_search(None,filter,handback)
bt.on_click(make_callstring)
add_node.link(bt, value='New_rel')
newcol = pn.Column(add_node,bt,styles={'background':'lightgreen'})
self.downnav.append(newcol)
# Option to add a new child from template to the down side
if access_state=='W':
if 'Template' in target.labels:
add_node=pn.widgets.TextInput(name='New child',value='')
add_relation=pn.widgets.TextInput(name='New relation',value='')
bt = pn.widgets.Button(name='Add new child', button_type='primary',styles={'background':'lightgreen'}, width=300)
bt.New_node=''
bt.New_relation=''
bt.target = target
bt.UID=target['UID']
bt.template=True
bt.on_click(self.add_new_child)
add_node.link(bt, value='New_node')
add_relation.link(bt, value='New_relation')
newcol = pn.Column(add_node,add_relation,bt,styles={'background':'lightgreen'})
self.downnav.append(newcol)
else:
label=list(target.labels)
try:
label.remove('Template')
except:
pass
template=self.template_tree[label[0]]
if template!=None:
# Do we have a value for this field?
downlinks=template['downlinks']
#print("downlinks :",downlinks)
if downlinks!=[]:
options=[]
for l in downlinks:
options.append(l[0]+":"+l[1])
add_node=pn.widgets.Select(name='Allowed new child',options=options,value=options[0])
bt = pn.widgets.Button(name='Add new child', button_type='primary',styles={'background':'lightgreen'}, width=300)
bt.New_node=options[0]
bt.target = target
bt.UID=target['UID']
bt.template=False
bt.on_click(self.add_new_child)
add_node.link(bt, value='New_node')
newcol = pn.Column(add_node,bt,styles={'background':'lightgreen'})
self.downnav.append(newcol)
tab_items=[]
accordion_position=0
for l in downward_links:
label=list(self.get_end_node(l).labels)
if "Template" in label:
name=''
else:
name=self.get_end_node(l).get('Name','')
try:
label.remove('Template')
except:
pass
keys=list(l.keys())
go_bt=pn.widgets.Button(name='Go',button_type='primary')
go_bt.UID=self.get_end_node(l)['UID']
go_bt.on_click(self.go_to_new)
fields=list(l.keys())
svg="""<svg xmlns="http://www.w3.org/2000/svg" width="24" height="24" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round"
stroke-linejoin="round" class="icon icon-tabler icons-tabler-outline icon-tabler-shopping-cart-plus"><path stroke="none" d="M0 0h24v24H0z"
fill="none"/><path d="M4 19a2 2 0 1 0 4 0a2 2 0 0 0 -4 0" /><path d="M12.5 17h-6.5v-14h-2" /><path d="M6 5l14 1l-.86 6.017m-2.64 .983h-10.5" /><path d="M16 19h6" /><path d="M19 16v6" /></svg>"""
bt = pn.widgets.ButtonIcon(icon=svg,active_icon=svg, styles={'background':'lightgreen'}, size="4em")
bt.target=self.get_end_node(l)['UID']
bt.on_click(self.add_to_cart)
accordion_card=pn.Column()
accordion_card.append(pn.Row(go_bt,bt))
editor=pn.Column()
self.watcher_collector_down=[]
html_pane = pn.pane.HTML("<h1>"+str(l.type)+"</h1>")
relation_name=str(l.type)
editor.append(html_pane)
if 'Template' in target.labels:
edit_position=1
for f in fields:
if access_state=='W':
w=pn.widgets.TextInput(name=f,value=l[f],disabled=False)
else:
w=pn.widgets.TextInput(name=f,value=l[f],disabled=True)
w.field=f
w.target=l
w.modus='text'
w.position=[accordion_position,edit_position]
w.relation_name=relation_name
if access_state=='W':
b=pn.widgets.Button(name="Edit",button_type="success")
b.position=[accordion_position,edit_position]
b.direction="down"
b.relation_name=relation_name
b.on_click(self.open_json_editor)
d=pn.widgets.Button(name="DEL",button_type="danger")
d.field=f
d.target=l
d.direction="down"
d.relation_name=relation_name
d.on_click(self.delete_relation_field)
edit_position=edit_position=edit_position+1
#self.editor_map[f]=[w,target[f],False]
self.watcher_collector_up.append(w.param.watch(self.save_relationship, ['value'], onlychanged=True))
#w.param.watch(self.save_object, ['value'], onlychanged=True)
if access_state=='W':
editor.append(pn.Row(w,b,d))
else:
editor.append(w)
else:
for f in fields:
w,modus,free_field,tab=self.choose_widget(l,f,access_state)
w.field=f
w.target=l
w.modus=modus
w.relation_name=relation_name
self.watcher_collector_down.append(w.param.watch(self.save_relationship, ['value'], onlychanged=True))
if free_field and access_state=='W':
d=pn.widgets.Button(name="DEL",button_type="danger")
d.field=f
d.target=l
d.direction="down"
d.relation_name=relation_name
d.on_click(self.delete_relation_field)
editor.append(pn.Row(w,d))
else:
editor.append(w)
accordion_card.append(editor)
if access_state=='W':
bt = pn.widgets.Button(name='Add new field', button_type='primary',styles={'background':'lightgrey'}, width=300)
add_label = pn.widgets.TextInput(name='New field',value='')
add_value = pn.widgets.TextInput(name='New value',value='')
add_label.link(bt, value='New_label')
add_value.link(bt, value='New_value')
bt.New_label=''
bt.New_value=''
bt.target = l
bt.relation_name=relation_name
bt.on_click(self.add_new_relation_field)
bt.direction="down"
newcol = pn.Column(add_label, add_value,bt,styles={'background':'lightgrey'})
accordion_card.append(newcol)
# Relationship removal - dumb implemntation so far
bt = pn.widgets.Button(name='Remove relation', button_type='danger',styles={'background':'red'}, width=300)
bt.on_click(self.remove_relation)
bt.target = l
bt.direction="down"
bt.relation_name=relation_name
accordion_card.append(bt)
tab_items.append(["-"+l.type.upper()+"->"+label[0]+":"+name,accordion_card,name])
accordion_position=accordion_position+1
expander=pn.Accordion(toggle=True,css_classes=['CPath-accordion1'])
if tab_items!=[]:
print("grouping",grouping)
if grouping!=None:
expander=pn.Accordion(toggle=True,css_classes=['CPath-accordion1'])
accordion_set=[]
for g in grouping:
accordion_set.append(pn.Accordion(toggle=True,css_classes=['CPath-accordion1']))
accordion_set.append(pn.Accordion(toggle=True,css_classes=['CPath-accordion1']))
for tab in tab_items:
found=False
for i,g in enumerate(grouping):
if tab[2].startswith(g):
accordion_set[i].append((tab[0],tab[1]))
found=True
break
if not found:
accordion_set[-1].append((tab[0],tab[1]))
for i,g in enumerate(grouping):
expander.append((g,accordion_set[i]))
expander.append(("Other",accordion_set[-1]))
else:
expander=pn.Accordion(toggle=True,css_classes=['CPath-accordion1'])
if len(tab_items)>25:
cuts=len(tab_items)//25
for i in range(cuts):
sub_accordion=pn.Accordion(toggle=True,css_classes=['CPath-accordion1'])
for x in range(25*i,25*(i+1)):
sub_accordion.append((tab_items[x][0],tab_items[x][1]))
expander.append((str(25*i)+"..."+str(25*(i+1)),sub_accordion))
sub_accordion=pn.Accordion(toggle=True,css_classes=['CPath-accordion1'])
for x in range(25*(i+1),len(tab_items)):
sub_accordion.append((tab_items[x][0],tab_items[x][1]))
expander.append((str(25*(i+1))+"...",pn.Column(sub_accordion,height_policy='fit')))
else:
for tab in tab_items:
expander.append((tab[0],tab[1]))
self.downnav.append(pn.Column(expander,height_policy='fit'))
#Adding a box to create new links
return
def _get_json_data(self, topnode: str) -> list:
if "Configuration" in topnode.labels:
raw_data_l1 = self.evaluate_query(f"""
MATCH (x {{UID:'{topnode['UID']}'}})-->(a)
WITH x,a
ORDER BY labels(a)[0]
RETURN COLLECT(DISTINCT {{
text: labels(a), type:'a',id:a.UID, parent:x.UID, state:{{disabled:false,opened:true}}
}})
""")
#print("L1 : ",raw_data_l1)
raw_data_l2 = self.evaluate_query(f"""
MATCH (x {{UID:'{topnode['UID']}'}})-->(a)-->(b)
WITH x,a,b
ORDER BY labels(b)[0]
RETURN COLLECT(DISTINCT {{
text: labels(b)[0], type:'b',id:b.UID, parent:a.UID, state:{{disabled:false,opened:true}}
}})
""")
#print("L2 : ",raw_data_l2)
raw_data_l3 = self.evaluate_query(f"""
MATCH (x {{UID:'{topnode['UID']}'}})-->(a)-->(b)-->(c)
WITH x,a,b,c
ORDER BY labels(c)[0]
RETURN COLLECT(DISTINCT {{
text: labels(c)[0], type:'c',id:c.UID, parent:b.UID, state:{{disabled:false,opened:false}}
}})
""")
#print("L3 : ",raw_data_l3)
raw_data_l4 = self.evaluate_query(f"""
MATCH (x {{UID:'{topnode['UID']}'}})-->(a)-->(b)-->(c)-->(d)
WITH x,a,b,c,d
ORDER BY labels(d)[0]
RETURN COLLECT(DISTINCT {{
text: labels(d)[0], type:'d',id:d.UID, parent:c.UID, state:{{disabled:false,opened:false}}
}})
""")
raw_data_l5 = self.evaluate_query(f"""
MATCH (x {{UID:'{topnode['UID']}'}})-->(a)-->(b)-->(c)-->(d)-->(e)
WITH x,a,b,c,d,e
ORDER BY labels(e)[0]
RETURN COLLECT(DISTINCT {{
text: labels(e)[0], type:'e',id:e.UID, parent:d.UID, state:{{disabled:false,opened:false}}
}})
""")
raw_data_l6 = self.evaluate_query(f"""
MATCH (x {{UID:'{topnode['UID']}'}})-->(a)-->(b)-->(c)-->(d)-->(e)-->(f)
WITH x,a,b,c,d,e,f
ORDER BY labels(f)[0]
RETURN COLLECT(DISTINCT {{
text: labels(f)[0], type:'f',id:f.UID, parent:e.UID, state:{{disabled:false,opened:false}}
}})
""")
else:
raw_data_l1 = self.evaluate_query(f"""
MATCH (x {{UID:'{topnode['UID']}'}})-->(a)
WITH x,a
ORDER BY labels(a)[0]
RETURN COLLECT(DISTINCT {{
text: labels(a)+'-'+a.Name, type:'a',id:a.UID+':'+x.UID, parent:x.UID, state:{{disabled:false,opened:true}}
}})
""")
#print("L1 : ",raw_data_l1)
raw_data_l2 = self.evaluate_query(f"""
MATCH (x {{UID:'{topnode['UID']}'}})-->(a)-->(b)
WITH x,a,b
ORDER BY labels(b)[0]
RETURN COLLECT(DISTINCT {{
text: labels(b)+'-'+b.Name, type:'b',id:b.UID+':'+a.UID, parent:a.UID+':'+x.UID, state:{{disabled:false,opened:false}}
}})
""")
#print("L2 : ",raw_data_l2)
raw_data_l3 = self.evaluate_query(f"""
MATCH (x {{UID:'{topnode['UID']}'}})-->(a)-->(b)-->(c)
WITH x,a,b,c
ORDER BY labels(c)[0]
RETURN COLLECT(DISTINCT {{
text: labels(c)+'-'+c.Name, type:'c',id:c.UID+':'+b.UID, parent:b.UID+':'+a.UID, state:{{disabled:false,opened:false}}
}})
""")
#print("L3 : ",raw_data_l3)
raw_data_l4 = self.evaluate_query(f"""
MATCH (x {{UID:'{topnode['UID']}'}})-->(a)-->(b)-->(c)-->(d)
WITH x,a,b,c,d
ORDER BY labels(d)[0]
RETURN COLLECT(DISTINCT {{
text: labels(d)+'-'+d.Name, type:'d',id:d.UID+':'+c.UID, parent:c.UID+':'+b.UID, state:{{disabled:false,opened:false}}
}})
""")
raw_data_l5 = self.evaluate_query(f"""
MATCH (x {{UID:'{topnode['UID']}'}})-->(a)-->(b)-->(c)-->(d)-->(e)
WITH x,a,b,c,d,e
ORDER BY labels(e)[0]
RETURN COLLECT(DISTINCT {{
text: labels(e)+'-'+e.Name, type:'e',id:e.UID+':'+d.UID, parent:d.UID+':'+c.UID, state:{{disabled:false,opened:false}}
}})
""")
raw_data_l6 = self.evaluate_query(f"""
MATCH (x {{UID:'{topnode['UID']}'}})-->(a)-->(b)-->(c)-->(d)-->(e)-->(f)
WITH x,a,b,c,d,e,f
ORDER BY labels(f)[0]
RETURN COLLECT(DISTINCT {{
text: labels(f)+'-'+f.Name, type:'f',id:f.UID+':'+e.UID, parent:e.UID+':'+d.UID, state:{{disabled:false,opened:false}}
}})
""")
#print("L4 : ",raw_data_l3)
data = [{"text":topnode['N'],
"id":topnode['UID'],
"parent":"#",
"type":topnode['N'],
"state":{"opened":True}
}]
data.extend(raw_data_l1)
data.extend(raw_data_l2)
data.extend(raw_data_l3)
data.extend(raw_data_l4)
data.extend(raw_data_l5)
data.extend(raw_data_l6)
return data
def create_carrousel(self, top_UID):
self.sidebar.clear()
if True:
top_node = self.evaluate_query(f"MATCH (s {{UID:'{top_UID}'}}) RETURN s")
if "Template" in top_node.labels or "Configuration" in top_node.labels or "People" in top_node.labels:
self.create_viewbox(top_UID)
else:
data = self._get_json_data(top_node)
#print("full json data : ",data)
self.navigation_tree = JSTree_json(data=data)
def navigation_tree_click(e):
self.create_viewbox(e.obj.selected.split(':')[0])
self.navigation_tree.on_click(navigation_tree_click)
self.sidebar.append(self.navigation_tree)
#except Exception as e:
# self.log.exception(e)
# navigation_tree = pn.Column("Something went wrong..")
#finally:
def chat_viewer(self,event=None):
self.workspace.clear()
self.create_chatbox()
return
def tree_viewer(self,event=None,first=True):
"""
shows the search and spin screen to select studies and manage carrousel
Parameters
----------
event : object | Nonetype
Event object indicating whether this is the first run on the event.obj.first property
first : bool
Boolean showing whether this is the first run
"""
if event==None:
first_run=first
else:
pn.state.session_args.clear()
first_run=event.obj.first
if first_run:
self.workspace.clear()
self.top_UID=self.evaluate_query("match (x:"+str(event.obj.usermode)+") return x.UID")
self.create_viewbox(self.top_UID)
#self.create_carrousel(self.top_UID)
# self.search_page=search_page(self.graph, self.log, self.syslog, self.current_user.user, self.current_user.UID,self.current_user.is_staff)
# self.workspace.append(pn.Row(self.search_page.body,sizing_mode="stretch_width"))
# self.search_box_watcher=self.search_page.param.watch(self.action_from_search, ['output_uid'], onlychanged=True)
# if image_uid:
# self.search_page.redirect_to_viewer(image_uid)
return
else:
# try:
# del self.myviewport
# except Exception as e:
# print(e)
self.workspace.clear()
# del self.search_page
# #with panel 0.14 tabulators refuse to center when they are expanded. As such, we'll just have to remake the tabulator.
# self.search_page=search_page(self.graph, self.log, self.syslog, self.current_user.user, self.current_user.UID,self.current_user.is_staff)
# self.workspace.append(pn.Row(self.search_page.body,sizing_mode="stretch_width"))
# self.search_box_watcher=self.search_page.param.watch(self.action_from_search, ['output_uid'], onlychanged=True)
# self.search_page.output_uid=""
return
def direct_start_app(self,event=None):
"""
Shows the home screen of the app
"""
self.workspace.clear()
select_box=pn.Column(margin=(30,30),width=400,align="center")
html_pane = pn.pane.HTML("""
<h1 style="color: #aa64cf; margin: 20px 20px 20px 40px; text-align: center;"> Which data class do you want to work on?</h1>
""")
select_box.append(html_pane)
people_mode_bt=pn.widgets.Button(name="People",button_type='primary',height=100,css_classes=['CPathBtn2'], max_width=400, align='center')
people_mode_bt.usermode="People"
people_mode_bt.first=True
# people_mode_bt.disabled=True
people_mode_bt.on_click(self.tree_viewer)
customer_mode_bt=pn.widgets.Button(name="Customers",button_type='primary',height=100,css_classes=['CPathBtn2'], max_width=400, align='center')
customer_mode_bt.usermode="Customers"
customer_mode_bt.first=True
# customer_mode_bt.disabled=True
customer_mode_bt.on_click(self.tree_viewer)
project_mode_bt=pn.widgets.Button(name="Projects",button_type='primary',height=100,css_classes=['CPathBtn2'], max_width=400, align='center')
project_mode_bt.usermode="Projects"
project_mode_bt.first=True
# project_mode_bt.disabled=True
project_mode_bt.on_click(self.tree_viewer)
compounds_mode_bt=pn.widgets.Button(name="Compounds",button_type='primary',height=100,css_classes=['CPathBtn2'], max_width=400, align='center')
compounds_mode_bt.usermode="Compounds"
compounds_mode_bt.first=True
# compounds_mode_bt.disabled=True
compounds_mode_bt.on_click(self.tree_viewer)
equipment_mode_bt=pn.widgets.Button(name="Library",button_type='primary',height=100,css_classes=['CPathBtn2'], max_width=400, align='center')
equipment_mode_bt.usermode="Library"
equipment_mode_bt.first=True
# equipment_mode_bt.disabled=True
equipment_mode_bt.on_click(self.tree_viewer)
configuration_mode_bt=pn.widgets.Button(name="Configuration",button_type='primary',height=100,css_classes=['CPathBtn2'], max_width=400, align='center')
configuration_mode_bt.usermode="Configuration"
configuration_mode_bt.first=True
# configuration_mode_bt.disabled=True
configuration_mode_bt.on_click(self.tree_viewer)
chat_mode_bt=pn.widgets.Button(name="Ask-Me",button_type='primary',height=100,css_classes=['CPathBtn2'], max_width=400, align='center')
chat_mode_bt.usermode="Chat"
chat_mode_bt.first=True
# configuration_mode_bt.disabled=True
chat_mode_bt.on_click(self.chat_viewer)
controlled_docs_bt = pn.widgets.Button(name="Controlled Documents", button_type='primary', height=100, css_classes=['CPathBtn2'], max_width=400, align='center')
controlled_docs_bt.on_click(self.open_controlled_docs)
select_box.append(people_mode_bt)
select_box.append(customer_mode_bt)
select_box.append(project_mode_bt)
select_box.append(compounds_mode_bt)
select_box.append(equipment_mode_bt)
select_box.append(configuration_mode_bt)
select_box.append(chat_mode_bt)
select_box.append(controlled_docs_bt) # Add the new button
self.workspace.append(pn.Row(pn.layout.HSpacer(),select_box,pn.layout.HSpacer(),sizing_mode="stretch_width"))
#Fill the shopping cart if available
result=self.evaluate_query("match (u:User {UID:'"+self.current_user.UID+"'}) return u.cart")
if result != None:
self.shopping_cart=pd.read_json(result)
self.shopping_cart.index=range(len(self.shopping_cart))
print("shopping cart ",self.shopping_cart)
return
def open_controlled_docs(self, event=None):
"""Launch the controlled documents system and pass user session"""
# Show loading indicator
self.loading_indicator.name = "Loading CDocs..."
self.loading_indicator.value = True
try:
self.workspace.clear()
loading_msg = pn.pane.Markdown("# Loading Controlled Documents System...")
self.workspace.append(loading_msg)
# First check if CDocs is already imported
if 'CDocs' not in sys.modules:
# Import the CDocs module dynamically
import importlib.util
spec = importlib.util.spec_from_file_location("CDocs.main", "/tf/active/CDocs/main.py")
cdocs_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(cdocs_module)
# Create a new instance of ControlledDocumentApp
from CDocs.main import ControlledDocumentApp
cdocs_app = ControlledDocumentApp()
# Set the environment attribute directly BEFORE initialization
cdocs_app.environment = "Integrated"
# Initialize integration - handle both with and without integration_mode
try:
cdocs_app.initialize_integration(integration_mode=True)
except TypeError:
# Fallback if integration_mode is not accepted
cdocs_app.initialize_integration()
# Manually set integration mode
setattr(cdocs_app, 'integration_mode', True)
# Import the current user - pass full user object instead of just attributes
user_imported = cdocs_app.import_user_from_datacapture(self.current_user)
if user_imported:
status_message = "User session successfully transferred"
else:
status_message = "Warning: User session could not be transferred"
else:
# CDocs already imported, create a new instance
from CDocs.main import ControlledDocumentApp
cdocs_app = ControlledDocumentApp()
# Set the environment attribute directly BEFORE initialization
cdocs_app.environment = "Integrated"
# Initialize integration - handle both with and without integration_mode
try:
cdocs_app.initialize_integration(integration_mode=True)
except TypeError:
# Fallback if integration_mode is not accepted
cdocs_app.initialize_integration()
# Manually set integration mode
setattr(cdocs_app, 'integration_mode', True)
user_imported = cdocs_app.import_user_from_datacapture(self.current_user)
status_message = "Connected to CDocs instance"
# CLEAR the workspace
self.workspace.clear()
# Add the CDocs interface to the workspace
# Use the complete application view rather than just the main_content
if hasattr(cdocs_app, 'get_integrated_view'):
# If there's a dedicated method for integration
integrated_view = cdocs_app.get_integrated_view()
self.workspace.append(integrated_view)
else:
# Fallback to direct template usage if available
if hasattr(cdocs_app, 'template') and cdocs_app.template is not None:
self.workspace.append(cdocs_app.template)
else:
# Last resort - just show main content with header
self.workspace.append(pn.Column(
pn.pane.Markdown(f"# Controlled Document Management System\n\n{status_message}"),
cdocs_app.main_content
))
# Store reference to cdocs_app to prevent garbage collection
self._cdocs_app = cdocs_app
# Add a "Return to DataCapture" button at the top
return_btn = pn.widgets.Button(
name="← Return to DataCapture",
button_type="primary",
width=200
)
return_btn.on_click(self.direct_start_app)
# Insert the return button at the top of the workspace
if len(self.workspace) > 0:
self.workspace.insert(0, return_btn)
else:
self.workspace.append(return_btn)
self.loading_indicator.name = "Idle..."
self.loading_indicator.value = False
except Exception as e:
self.loading_indicator.name = "Error"
self.loading_indicator.value = False
error_message = f"Error loading Controlled Documents System: {str(e)}"
import traceback
self.log.error(traceback.format_exc())
# Show error message and return button
self.workspace.clear()
self.workspace.append(pn.Column(
pn.pane.Markdown(f"# Error Loading Controlled Documents\n\n{error_message}"),
pn.widgets.Button(name="Back to DataCapture", button_type="primary", on_click=self.direct_start_app)
))
self.log.error(error_message)
return
def _options_page(self, *args, **kwargs):
"""
Shows the options page of the app
"""
self.workspace.clear()
self.log.info("Accessing options page")
options_page = options(self.graph, self.log, self.current_user)
self.workspace.append(options_page.body)
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
param.Parameterized | - |
Parameter Details
bases: Parameter of type param.Parameterized
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self)
Purpose: Internal method: init
Returns: None
__del__(self)
Purpose: Close database connections when the object is destroyed
Returns: None
chat_toggle(self, event)
Purpose: Performs chat toggle
Parameters:
event: Parameter
Returns: None
update_instruction_template(self, template_name)
Purpose: Update the detailed instructions based on selected template
Parameters:
template_name: Parameter
Returns: None
save_instruction_template(self, event)
Purpose: Save current instructions as a custom template
Parameters:
event: Parameter
Returns: None
load_instruction_template(self, event)
Purpose: Load and apply selected instruction template
Parameters:
event: Parameter
Returns: None
update_access_tokens(self, UID)
Purpose: Updates access tokens
Parameters:
UID: Parameter
Returns: None
load_template_tree(self)
Purpose: Performs load template tree
Returns: None
safe_float(self, x)
Purpose: Performs safe float
Parameters:
x: Parameter
Returns: None
safe_int(self, x)
Purpose: Performs safe int
Parameters:
x: Parameter
Returns: None
_create_header_items(self)
Purpose: Internal method: create header items
Returns: None
generate_welcome(self)
Purpose: Generates the welcome screen if the user is logged in through O365 or pops up the login screen and waits for it
Returns: None
deferred_login(self, event)
Purpose: processes the login if a "go" comes from the login screen (from the user class) Parameters ---------- event : object Event object where the new property contains a str
Parameters:
event: Parameter
Returns: None
show_user_prefs(self, event)
Purpose: Shows modal with user preferences and files
Parameters:
event: Parameter
Returns: None
user_logout(self, event)
Purpose: Performs user logout
Parameters:
event: Parameter
Returns: None
init_connections(self)
Purpose: Performs init connections
Returns: None
run_query(self, query, params)
Purpose: Execute a Cypher query and return the result Parameters ---------- query : str The Cypher query to execute params : dict, optional Parameters for the query Returns ------- result The query result
Parameters:
query: Parameterparams: Parameter
Returns: See docstring for return details
evaluate_query(self, query, params)
Purpose: Execute a Cypher query and return a single result Parameters ---------- query : str The Cypher query to execute params : dict, optional Parameters for the query Returns ------- object The single result value
Parameters:
query: Parameterparams: Parameter
Returns: See docstring for return details
push_changes(self, node)
Purpose: Push changes to a node to the database Parameters ---------- node : dict or node-like object Node with properties to update
Parameters:
node: Parameter
Returns: None
where_is_file(self, UID)
Purpose: Find file location Parameters ---------- UID : str The UID of the file to find Returns ------- store : str The filepath to the image file
Parameters:
UID: Parameter
Returns: See docstring for return details
action_from_search(self, event)
Purpose: Event handler. Gets called when the output_uid parameter of the carrousal changes, and goes to the respective page Parameters ---------- event : object Event object containing a UID in the event.new property, either matching a study (QC) or parblock (view)
Parameters:
event: Parameter
Returns: None
choose_widget(self, target, f, access_state)
Purpose: Performs choose widget
Parameters:
target: Parameterf: Parameteraccess_state: Parameter
Returns: None
choose_filter_widget(self, label, f, value)
Purpose: Performs choose filter widget
Parameters:
label: Parameterf: Parametervalue: Parameter
Returns: None
save_object(self, event)
Purpose: Performs save object
Parameters:
event: Parameter
Returns: None
add_new_field(self, event)
Purpose: Performs add new field
Parameters:
event: Parameter
Returns: None
add_new_child(self, event)
Purpose: Performs add new child
Parameters:
event: Parameter
Returns: None
link_to_object(self, startnode, endnode, relation, direction, is_template)
Purpose: Performs link to object
Parameters:
startnode: Parameterendnode: Parameterrelation: Parameterdirection: Parameteris_template: Parameter
Returns: None
remove_node(self, event)
Purpose: Deletes node
Parameters:
event: Parameter
Returns: None
delete_field(self, event)
Purpose: Deletes field
Parameters:
event: Parameter
Returns: None
find_edges(self, UID)
Purpose: Performs find edges
Parameters:
UID: Parameter
Returns: None
takesecondkey(self, elem)
Purpose: Performs takesecondkey
Parameters:
elem: Parameter
Returns: None
make_expspace_links(self, event)
Purpose: Creates expspace
Parameters:
event: Parameter
Returns: None
button_row_generate(self, title, objtype, startpoint)
Purpose: Performs button row generate
Parameters:
title: Parameterobjtype: Parameterstartpoint: Parameter
Returns: None
remove_from_cart(self, event)
Purpose: Deletes from
Parameters:
event: Parameter
Returns: None
report_from_cart(self, event)
Purpose: Performs report from cart
Parameters:
event: Parameter
Returns: None
push_keyword(self, event)
Purpose: Performs push keyword
Parameters:
event: Parameter
Returns: None
hint_keyword(self, event)
Purpose: Performs hint keyword
Parameters:
event: Parameter
Returns: None
display_report(self, event)
Purpose: Performs display report
Parameters:
event: Parameter
Returns: None
filter_update(self, event)
Purpose: Performs filter update
Parameters:
event: Parameter
Returns: None
filter_update_chat(self, event)
Purpose: Performs filter update chat
Parameters:
event: Parameter
Returns: None
categories_update(self, event)
Purpose: Performs categories update
Parameters:
event: Parameter
Returns: None
categories_update_chat(self, event)
Purpose: Performs categories update chat
Parameters:
event: Parameter
Returns: None
convert_data(self, data, datatype)
Purpose: Performs convert data
Parameters:
data: Parameterdatatype: Parameter
Returns: None
extend_tree(self, start, target, data_type)
Purpose: Performs extend tree
Parameters:
start: Parametertarget: Parameterdata_type: Parameter
Returns: None
generate_sample_report(self, event)
Purpose: Performs generate sample report
Parameters:
event: Parameter
Returns: None
retrieve_internal_data(self)
Purpose: Performs retrieve internal data
Returns: None
return_selected_keywords(self)
Purpose: Performs return selected keywords
Returns: None
retrieve_lit_chunks(self)
Purpose: Performs retrieve lit chunks
Returns: None
generate_report(self, event)
Purpose: Performs generate report
Parameters:
event: Parameter
Returns: None
report_choice(self, report_type)
Purpose: Performs report choice
Parameters:
report_type: Parameter
Returns: None
adapt_report(self, event)
Purpose: Performs adapt report
Parameters:
event: Parameter
Returns: None
display_search(self, event, filter, handback, field)
Purpose: Performs display search
Parameters:
event: Parameterfilter: Parameterhandback: Parameterfield: Parameter
Returns: None
open_json_editor(self, event)
Purpose: Performs open json editor
Parameters:
event: Parameter
Returns: None
save_json_editor(self, event)
Purpose: Performs save json editor
Parameters:
event: Parameter
Returns: None
show_similars(self, event)
Purpose: Performs show similars
Parameters:
event: Parameter
Returns: None
show_chemical(self, event)
Purpose: Performs show chemical
Parameters:
event: Parameter
Returns: None
add_to_cart(self, event)
Purpose: Performs add to cart
Parameters:
event: Parameter
Returns: None
update_anchor_point(self, UID)
Purpose: Updates anchor point
Parameters:
UID: Parameter
Returns: None
update_process_anchor_change(self, event)
Purpose: Updates process anchor change
Parameters:
event: Parameter
Returns: None
left_chat_update(self)
Purpose: Performs left chat update
Returns: None
right_chat_update(self)
Purpose: Performs right chat update
Returns: None
create_chatbox(self)
Purpose: Creates chatbox
Returns: None
create_viewbox(self, UID)
Purpose: Creates a viewbox for the given UID. This method performs the following steps: 1. Sets a loading indicator. 2. Retrieves the target node from the graph database using the provided UID. 3. Determines the access state (read or write) based on the current user's keys. 4. If the user has no access, displays an access denied message. 5. If the target node is a template, creates an editable view with appropriate widgets. 6. If the target node is not a template, creates a detailed view with widgets and buttons for interaction. 7. Adds options for adding new fields and removing nodes if the user has write access. 8. Updates navigation elements based on the target node's labels. 9. Resets the loading indicator. Parameters: UID (str): The unique identifier of the target node. Returns: None
Parameters:
UID: Parameter
Returns: See docstring for return details
floater_close(self, event)
Purpose: Performs floater close
Parameters:
event: Parameter
Returns: None
go_to_new(self, event)
Purpose: Performs go to new
Parameters:
event: Parameter
Returns: None
save_relationship(self, event)
Purpose: Performs save relationship
Parameters:
event: Parameter
Returns: None
add_new_relation_field(self, event)
Purpose: Performs add new relation field
Parameters:
event: Parameter
Returns: None
remove_relation(self, event)
Purpose: Deletes relation
Parameters:
event: Parameter
Returns: None
get_start_node(self, relationship)
Purpose: Retrieve the start node of a Neo4j relationship from the database. Parameters ---------- relationship : neo4j.graph.Relationship The relationship object from which to get the start node Returns ------- neo4j.graph.Node The complete start node object with all properties Notes ----- This method is designed for Neo4j 5.x+ with the official driver, using elementId() for node identification.
Parameters:
relationship: Parameter
Returns: See docstring for return details
get_end_node(self, relationship)
Purpose: Retrieve the end node of a Neo4j relationship from the database. Parameters ---------- relationship : neo4j.graph.Relationship The relationship object from which to get the end node Returns ------- neo4j.graph.Node The complete end node object with all properties Notes ----- This method is designed for Neo4j 5.x+ with the official driver, using elementId() for node identification.
Parameters:
relationship: Parameter
Returns: See docstring for return details
delete_relation_field(self, event)
Purpose: Deletes relation
Parameters:
event: Parameter
Returns: None
take_startnode_name(self, elem)
Purpose: Performs take startnode name
Parameters:
elem: Parameter
Returns: None
take_endnode_name(self, elem)
Purpose: Performs take endnode name
Parameters:
elem: Parameter
Returns: None
update_upnav(self, target)
Purpose: Updates upnav
Parameters:
target: Parameter
Returns: None
update_downnav(self, target)
Purpose: Updates downnav
Parameters:
target: Parameter
Returns: None
_get_json_data(self, topnode) -> list
Purpose: Internal method: get json data
Parameters:
topnode: Type: str
Returns: Returns list
create_carrousel(self, top_UID)
Purpose: Creates carrousel
Parameters:
top_UID: Parameter
Returns: None
chat_viewer(self, event)
Purpose: Performs chat viewer
Parameters:
event: Parameter
Returns: None
tree_viewer(self, event, first)
Purpose: shows the search and spin screen to select studies and manage carrousel Parameters ---------- event : object | Nonetype Event object indicating whether this is the first run on the event.obj.first property first : bool Boolean showing whether this is the first run
Parameters:
event: Parameterfirst: Parameter
Returns: None
direct_start_app(self, event)
Purpose: Shows the home screen of the app
Parameters:
event: Parameter
Returns: None
open_controlled_docs(self, event)
Purpose: Launch the controlled documents system and pass user session
Parameters:
event: Parameter
Returns: None
_options_page(self)
Purpose: Shows the options page of the app
Returns: None
Required Imports
from __future__ import unicode_literals
from __future__ import print_function
from __future__ import absolute_import
import hashlib
from io import BytesIO
Usage Example
# Example usage:
# result = pathobrowser_base(bases)
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class pathobrowser_base 99.4% similar
-
class UserProfile_v1 41.9% similar
-
class User 40.8% similar
-
class CDocsApp 40.2% similar
-
class PolicyBase 39.9% similar