class Graph
A Graph class that provides an interface for interacting with a Neo4j graph database, supporting CRUD operations on nodes and relationships through Cypher queries.
/tf/active/vicechatdev/neo4j_driver/neo4j_driver.py
92 - 328
complex
Purpose
This class serves as a wrapper around the Neo4j Python driver, providing convenient methods for connecting to a Neo4j database and performing common operations like matching nodes by ID/UID/name, creating/updating/deleting nodes and relationships, and executing custom Cypher queries. It handles connection management, automatic reconnection on service unavailability, and provides a clean interface for working with graph entities.
Source Code
class Graph():
"""
A Graph class for interacting with a Neo4j graph database. The Graph class has methods for running Cypher queries, matching nodes by ID, UID, and name, and matching relationships by ID.
"""
def __init__(self, URI, auth, database=None, name=None):
self.driver = GraphDatabase.driver(URI, auth=auth)
self.database = database or name
def close(self):
self.driver.close()
def open(self):
self.driver.open()
def __repr__(self):
return "Graph interface bound to host %s to database '%s'" % (self.driver.initial_addresses[0], self.database)
__str__ = __repr__
@staticmethod
def _get_label_strings(labels):
if None in labels:
return ''
return ''.join(":" + i for i in labels)
def catch_service_unavailable(func):
@wraps(func)
def wrapper(self,*args, **kwargs):
try:
return func(self,*args, **kwargs)
except ServiceUnavailable:
self.open()
return func(self,*args, **kwargs)
return wrapper
@staticmethod
def _run(tx, query, **kwargs):
result = tx.run(query, **kwargs)
records = list(result)
summary = result.consume()
return records
@catch_service_unavailable
def run(self, query, **kwargs):
with self.driver.session(database=self.database) as session:
result = self._run(session, query, **kwargs)
return ResultWrapper(list(result), graph=self)
@staticmethod
def _match_by_id(tx, x, label):
result = tx.run("MATCH (o%s) WHERE id(o) = $x RETURN o" % Graph._get_label_strings(label), x=x)
return result.single()[0]
@catch_service_unavailable
def match_by_id(self, x, label=None):
if not isinstance(x, int):
try:
x=int(x)
except:
raise ValueError("Failed to coerce id to type int. Element id must be of type int, passed '%s' of type %s" % (x, type(x)))
if not isinstance(label, list):
label=[label]
with self.driver.session(database=self.database) as session:
result = session.execute_read(self._match_by_id, x, label)
return Node._from_neo4j_node(result, graph = self)
@staticmethod
def _match_by_uid(tx, uid, label):
result = tx.run("MATCH (o%s {UID:$uid}) RETURN o" % Graph._get_label_strings(label), uid=uid)
record = result.single()[0]
summary = result.consume()
return record
@catch_service_unavailable
def match_by_uid(self, uid, label=None):
if not isinstance(label, list):
label=[label]
with self.driver.session(database=self.database) as session:
result = session.execute_read(self._match_by_uid, uid, label)
return Node._from_neo4j_node(result, graph=self)
@staticmethod
def _match_by_name(tx, name, label):
result = tx.run("MATCH (o%s {N:$name}) RETURN n" % Graph._get_label_strings(label), name=name)
return result.single()[0]
@catch_service_unavailable
def match_by_name(self, name, label=None):
if not isinstance(label, list):
label=[label]
with self.driver.session(database=self.database) as session:
result = session.execute_read(self._match_by_name, name, label)
return Node._from_neo4j_node(result, graph=self)
@staticmethod
def _match_relationship_by_id(tx, x):
result = tx.run("MATCH ()-[_]->() WHERE id(_) = $x RETURN _", x=x)
return result.single()[0]
@catch_service_unavailable
def match_relationship_by_id(self, x):
if not isinstance(x, int):
try:
x=int(x)
except:
raise ValueError("Failed to coerce id to type int. Element id must be of type int, passed '%s' of type %s" % (x, type(x)))
with self.driver.session(database=self.database) as session:
result = session.execute_read(self._match_relationship_by_id, x)
return Relationship._from_neo4j_node(result, graph=self)
@staticmethod
def _push(tx, element_id, properties):
result = tx.run("MATCH (o) WHERE id(o) = $x SET o = $properties", x=int(element_id), properties=properties)
records = list(result)
summary = result.consume()
return records
@catch_service_unavailable
def push(self, node):
assert node.graph, "Node is not associated with any database. Please use graph.create for new nodes, or retrieve the node from the database first."
assert node.graph == self, "Entity bound to different database."
assert node.element_id, "Please run graph.create when creating a node for the first time."
items = dict(node)
with self.driver.session(database=self.database) as session:
session.execute_write(self._push, node.element_id, items)
return
@staticmethod
def _node_pull(tx, ids):
query = tx.run("MATCH (_) WHERE id(_) in $x "
"RETURN id(_), labels(_), properties(_)", x=ids)
return list(query)
@staticmethod
def _relationship_pull(tx, ids):
result = tx.run("MATCH ()-[_]->() WHERE id(_) in $x "
"RETURN id(_), properties(_)", x=ids)
return list(result)
@catch_service_unavailable
def pull(self, entity):
nodes = {}
for node in entity.nodes:
if node.graph == self:
if not isinstance(node.element_id, int):
try:
node.element_id = int(node.element_id)
except:
warnings.warn("Could not coerce element id to int, skipped node %s" % node.element_id, stacklevel=5)
continue
nodes[node.element_id] = node
node._lock = True
with self.driver.session(database=self.database) as session:
query = session.execute_read(self._node_pull, list(nodes.keys()))
for element_id, new_labels, new_properties in query:
node = nodes[element_id]
node.clear_labels()
node.update_labels(new_labels)
node.clear()
node.update(new_properties)
node._lock = False
relationships = {}
for relationship in entity.relationships:
if relationship.graph == self:
relationships[relationship.element_id] = relationship
with self.driver.session(database=self.database) as session:
query = session.execute_read(self._relationship_pull, list(relationships.keys()))
for element_id, new_properties in query:
relationship = relationships[element_id]
relationship.clear()
relationship.update(new_properties)
@staticmethod
def _create(tx, query, data):
result = tx.run(query, data=data)
return list(result)
@catch_service_unavailable
def create(self, entity):
entity.graph=self #mostly to bind subgraphs
node_dict={}
for node in entity.nodes:
if node:
if not node.element_id:
key = frozenset(node.labels)
node_dict.setdefault(key, []).append(node)
rel_dict = {}
for relationship in entity.relationships:
key = frozenset(relationship.labels)
rel_dict.setdefault(key, []).append(relationship)
for labels, nodes in node_dict.items():
query = """
UNWIND $data AS d
MERGE (_%s {UID:d.UID})
ON CREATE
SET _ += d
RETURN id(_)
""" % self._get_label_strings(labels)
with self.driver.session(database=self.database) as session:
result = session.execute_write(self._create, query, list(map(dict, nodes)))
for i, return_id in enumerate(result):
node = nodes[i]
node.graph = self
node.element_id = return_id.value()
for labels, relationships in rel_dict.items():
data = map(lambda r: [r.start_node.element_id, dict(r.relationship), r.end_node.element_id],
relationships)
# print(list(data)) #calling prematurely exhausts the generator
query = """
UNWIND $data as d
MATCH (a) WHERE id(a) = d[0]
MATCH (b) WHERE id(b) = d[2]
MERGE (a)-[_%s]->(b) SET _ = d[1]
RETURN id(_)
""" % self._get_label_strings(labels)
with self.driver.session(database=self.database) as session:
result = session.execute_write(self._create, query, list(data))
for i, return_id in enumerate(result):
rel = relationships[i]
rel.graph=self
rel.element_id = return_id.value()
@staticmethod
def _delete(tx, identities):
result = tx.run("MATCH (_) WHERE id(_) IN $x DETACH DELETE _", x=identities)
return list(result)
@catch_service_unavailable
def delete(self, entity):
identities = []
for rel in entity.relationships:
identities.append(rel.element_id)
for node in entity.nodes:
if node.element_id:
identities.append(node.element_id)
with self.driver.session(database=self.database) as session:
session.execute_write(self._delete, identities)
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
URI: The connection URI for the Neo4j database (e.g., 'bolt://localhost:7687' or 'neo4j://localhost:7687'). This specifies the protocol, host, and port for the database connection.
auth: Authentication credentials for the Neo4j database, typically a tuple of (username, password) or an auth object created by neo4j.basic_auth(). Required for database access.
database: Optional name of the specific database to connect to within the Neo4j instance. If not provided, falls back to the 'name' parameter or uses the default database.
name: Alternative parameter name for specifying the database name. Deprecated in favor of 'database' parameter but maintained for backward compatibility.
Return Value
Instantiation returns a Graph object that maintains a connection to the Neo4j database. Methods return various types: run() returns a ResultWrapper object, match methods return Node or Relationship objects, create/push/pull/delete methods return None or modify entities in place.
Class Interface
Methods
__init__(self, URI, auth, database=None, name=None)
Purpose: Initialize a Graph instance with connection parameters to a Neo4j database
Parameters:
URI: Connection URI for Neo4j databaseauth: Authentication credentials (username, password tuple or auth object)database: Optional database name to connect toname: Alternative parameter for database name (deprecated)
Returns: None (constructor)
close(self)
Purpose: Close the Neo4j driver connection
Returns: None
open(self)
Purpose: Open/reopen the Neo4j driver connection
Returns: None
__repr__(self) -> str
Purpose: Return a string representation of the Graph instance showing host and database
Returns: String describing the graph interface connection
_get_label_strings(labels) -> str
static
Purpose: Convert a list of labels into a Cypher-compatible label string (e.g., ':Person:Employee')
Parameters:
labels: List of label strings or None
Returns: Formatted label string for Cypher queries, empty string if None in labels
catch_service_unavailable(func)
Purpose: Decorator that catches ServiceUnavailable exceptions and attempts to reconnect before retrying
Parameters:
func: Function to wrap with exception handling
Returns: Wrapped function with automatic reconnection on service unavailability
_run(tx, query, **kwargs) -> list
static
Purpose: Execute a Cypher query within a transaction and return all records
Parameters:
tx: Neo4j transaction objectquery: Cypher query string to executekwargs: Parameters to pass to the query
Returns: List of result records from the query
run(self, query, **kwargs) -> ResultWrapper
Purpose: Execute a Cypher query and return results wrapped in a ResultWrapper object
Parameters:
query: Cypher query string to executekwargs: Parameters to pass to the query
Returns: ResultWrapper object containing query results
_match_by_id(tx, x, label)
static
Purpose: Internal method to match a node by its Neo4j internal ID within a transaction
Parameters:
tx: Neo4j transaction objectx: Integer ID of the nodelabel: List of labels to filter by
Returns: Neo4j node object
match_by_id(self, x, label=None) -> Node
Purpose: Match and return a node by its Neo4j internal ID
Parameters:
x: Integer or string ID of the node (will be coerced to int)label: Optional label or list of labels to filter by
Returns: Node object representing the matched node
_match_by_uid(tx, uid, label)
static
Purpose: Internal method to match a node by its UID property within a transaction
Parameters:
tx: Neo4j transaction objectuid: UID property value to matchlabel: List of labels to filter by
Returns: Neo4j node object
match_by_uid(self, uid, label=None) -> Node
Purpose: Match and return a node by its UID property
Parameters:
uid: UID property value to matchlabel: Optional label or list of labels to filter by
Returns: Node object representing the matched node
_match_by_name(tx, name, label)
static
Purpose: Internal method to match a node by its N (name) property within a transaction
Parameters:
tx: Neo4j transaction objectname: Name property value to matchlabel: List of labels to filter by
Returns: Neo4j node object
match_by_name(self, name, label=None) -> Node
Purpose: Match and return a node by its N (name) property
Parameters:
name: Name property value to matchlabel: Optional label or list of labels to filter by
Returns: Node object representing the matched node
_match_relationship_by_id(tx, x)
static
Purpose: Internal method to match a relationship by its Neo4j internal ID within a transaction
Parameters:
tx: Neo4j transaction objectx: Integer ID of the relationship
Returns: Neo4j relationship object
match_relationship_by_id(self, x) -> Relationship
Purpose: Match and return a relationship by its Neo4j internal ID
Parameters:
x: Integer or string ID of the relationship (will be coerced to int)
Returns: Relationship object representing the matched relationship
_push(tx, element_id, properties)
static
Purpose: Internal method to update a node's properties in the database within a transaction
Parameters:
tx: Neo4j transaction objectelement_id: Integer ID of the node to updateproperties: Dictionary of properties to set on the node
Returns: List of result records
push(self, node) -> None
Purpose: Update an existing node's properties in the database
Parameters:
node: Node object to push to the database (must have element_id and be bound to this graph)
Returns: None
_node_pull(tx, ids) -> list
static
Purpose: Internal method to retrieve node data by IDs within a transaction
Parameters:
tx: Neo4j transaction objectids: List of node IDs to retrieve
Returns: List of tuples containing (id, labels, properties) for each node
_relationship_pull(tx, ids) -> list
static
Purpose: Internal method to retrieve relationship data by IDs within a transaction
Parameters:
tx: Neo4j transaction objectids: List of relationship IDs to retrieve
Returns: List of tuples containing (id, properties) for each relationship
pull(self, entity) -> None
Purpose: Refresh an entity (node or subgraph) with the latest data from the database
Parameters:
entity: Entity object (Node, Relationship, or Subgraph) to refresh from database
Returns: None (updates entity in place)
_create(tx, query, data) -> list
static
Purpose: Internal method to execute a create/merge query within a transaction
Parameters:
tx: Neo4j transaction objectquery: Cypher query string for creating entitiesdata: List of data dictionaries to create
Returns: List of result records
create(self, entity) -> None
Purpose: Create nodes and relationships in the database, using MERGE to avoid duplicates based on UID
Parameters:
entity: Entity object (Node, Relationship, or Subgraph) to create in database
Returns: None (updates entity element_ids in place)
_delete(tx, identities) -> list
static
Purpose: Internal method to delete nodes and relationships by ID within a transaction
Parameters:
tx: Neo4j transaction objectidentities: List of element IDs to delete
Returns: List of result records
delete(self, entity) -> None
Purpose: Delete nodes and relationships from the database (performs DETACH DELETE)
Parameters:
entity: Entity object (Node, Relationship, or Subgraph) to delete from database
Returns: None
Attributes
| Name | Type | Description | Scope |
|---|---|---|---|
driver |
neo4j.GraphDatabase.driver | Neo4j driver instance used for all database connections and operations | instance |
database |
str or None | Name of the specific Neo4j database to connect to, or None for default database | instance |
Dependencies
neo4jfunctoolswarningspandas
Required Imports
from neo4j import GraphDatabase
from neo4j.exceptions import ServiceUnavailable
from functools import wraps
import warnings
Conditional/Optional Imports
These imports are only needed under specific conditions:
from neo4j_objects import *
Condition: Required for Node, Relationship, and ResultWrapper classes that are used throughout the Graph class methods
Required (conditional)import pandas as pd
Condition: Imported in source file but not directly used in this class; may be used by related classes or for data processing
OptionalUsage Example
from neo4j import GraphDatabase
from neo4j.auth import basic_auth
# Create a Graph instance
graph = Graph(
URI='bolt://localhost:7687',
auth=basic_auth('neo4j', 'password'),
database='neo4j'
)
# Run a Cypher query
result = graph.run('MATCH (n:Person) RETURN n LIMIT 10')
# Match a node by ID
node = graph.match_by_id(123, label=['Person'])
# Match a node by UID property
node = graph.match_by_uid('unique-id-123', label=['Person'])
# Create a new node (assuming Node class is available)
from neo4j_objects import Node
new_node = Node(labels=['Person'], properties={'name': 'John', 'UID': 'john-123'})
graph.create(new_node)
# Update an existing node
node['age'] = 30
graph.push(node)
# Pull latest data from database
graph.pull(node)
# Delete a node
graph.delete(node)
# Close the connection when done
graph.close()
Best Practices
- Always call close() when done with the Graph instance to properly release database connections
- Use context managers or try-finally blocks to ensure connections are closed even if errors occur
- The class automatically handles ServiceUnavailable exceptions and attempts to reconnect, but persistent connection issues should be investigated
- Nodes and relationships must have a UID property for proper MERGE operations in create()
- Before calling push(), ensure the node has been created in the database (has an element_id)
- The pull() method locks nodes during update to prevent concurrent modifications
- Label parameters can be passed as a single string or list of strings; they are automatically converted to lists internally
- Element IDs must be integers; the class attempts to coerce string IDs to integers but will raise ValueError if coercion fails
- When creating entities, the graph property is automatically set on nodes and relationships
- The delete() method performs DETACH DELETE, which removes all relationships before deleting nodes
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class Neo4jManager 74.9% similar
-
class Node 66.8% similar
-
class Relationship 64.3% similar
-
class Subgraph 59.5% similar
-
function run_query_v2 57.8% similar