我已经用尽了想法试图找到如此低的写入速度的原因。我的背景是关系数据库,所以我可能做错了什么。要添加10个节点和45个连接,我目前需要1.4秒(空DB)。这是不可接受的,在我看来,如果甚至应该是毫秒级。
要求
创建方法,将快照添加到Neo4j数据库中。一个快照由10个节点组成(所有节点都具有不同的标签和属性)。我需要单向连接此快照中的所有节点,并且不使用递归连接。这相当于每个快照有10个节点45个连接。属性强度= 1时创建关系。每次我添加一个新的关系,如果它已经存在(意味着match nodeA(oHash) -> nodeB(oHash)
),我只是增加强度而不是重复。
测量值
我补偿了API、Python本身等方面的所有开销。目前超过99.9%的执行时间来自查询Neo4j。我观察到生成节点似乎比生成连接慢得多(大约占总时间的90%)。
要在空数据库中生成一个快照(10个节点,45个连接),我的查询(来自Python)需要1.4秒,平均运行100次。
索引
在我将在下面的文章中展示的代码中,你会发现一个我从未调用过的创建约束方法。这是因为我已经在所有节点/标签类型上创建了索引,并且我删除了对它的调用,以减少检查现有索引的开销。每个节点都有一个“oHash”属性,它是所有属性(不包括内部Neo4j)的json的MD5哈希。这唯一地标识了我的节点,所以我在“oHash”上创建了一个UNIQUE约束。据我所知,创建UNIQUE约束也会在Neo4j中创建该属性的索引。
最佳实践
我使用了我在网上能找到的所有推荐的最佳实践。其中包括:
1.创建单个驱动程序示例并重用它
1.创建单个驱动程序会话并重用它
1.使用显式事务
1.使用查询参数
1.创建批处理并作为单个事务执行
执行情况
以下是我目前的实现:
import json
import hashlib
import uuid
from neo4j import GraphDatabase
class SnapshotRepository:
"""A repository to handle snapshots in a Neo4j database."""
def __init__(self):
"""Initialize a connection to the Neo4j database."""
with open("config.json", "r") as file:
config = json.load(file)
self._driver = GraphDatabase.driver(
config["uri"], auth=(config["username"], config["password"])
)
self._session = self._driver.session()
def delete_all(self):
"""Delete all nodes and relationships from the graph."""
self._session.run("MATCH (n) DETACH DELETE n")
def add_snapshot(self, data):
"""
Add a snapshot to the Neo4j database.
Args:
data (dict): The snapshot data to be added.
"""
snapshot_id = str(uuid.uuid4()) # Generate a unique snapshot ID
self._session.execute_write(self._add_complete_graph, data, snapshot_id)
def _create_constraints(self, tx, labels):
"""
Create uniqueness constraints for the specified labels.
Args:
tx (neo4j.Transaction): The transaction to be executed.
labels (list): List of labels for which to create uniqueness constraints.
"""
for label in labels:
tx.run(f"CREATE CONSTRAINT IF NOT EXISTS FOR (n:{label}) REQUIRE n.oHash IS UNIQUE")
@staticmethod
def _calculate_oHash(node):
"""
Calculate the oHash for a node based on its properties.
Args:
node (dict): The node properties.
Returns:
str: The calculated oHash.
"""
properties = {k: v for k, v in node.items() if k not in ['id', 'snapshotId', 'oHash']}
properties_json = json.dumps(properties, sort_keys=True)
return hashlib.md5(properties_json.encode('utf-8')).hexdigest()
def _create_or_update_nodes(self, tx, nodes, snapshot_id):
"""
Create or update nodes in the graph.
Args:
tx (neo4j.Transaction): The transaction to be executed.
nodes (list): The nodes to be created or updated.
snapshot_id (str): The ID of the snapshot.
"""
for node in nodes:
node['oHash'] = self._calculate_oHash(node)
node['snapshotId'] = snapshot_id
tx.run("""
MERGE (n:{0} {{oHash: $oHash}})
ON CREATE SET n = $props
ON MATCH SET n = $props
""".format(node['label']), oHash=node['oHash'], props=node)
def _create_relationships(self, tx, prev, curr):
"""
Create relationships between nodes in the graph.
Args:
tx (neo4j.Transaction): The transaction to be executed.
prev (dict): The properties of the previous node.
curr (dict): The properties of the current node.
"""
if prev and curr:
oHashA = self._calculate_oHash(prev)
oHashB = self._calculate_oHash(curr)
tx.run("""
MATCH (a:{0} {{oHash: $oHashA}}), (b:{1} {{oHash: $oHashB}})
MERGE (a)-[r:HAS_NEXT]->(b)
ON CREATE SET r.strength = 1
ON MATCH SET r.strength = r.strength + 1
""".format(prev['label'], curr['label']), oHashA=oHashA, oHashB=oHashB)
def _add_complete_graph(self, tx, data, snapshot_id):
"""
Add a complete graph to the Neo4j database for a given snapshot.
Args:
tx (neo4j.Transaction): The transaction to be executed.
data (dict): The snapshot data.
snapshot_id (str): The ID of the snapshot.
"""
nodes = data['nodes']
self._create_or_update_nodes(tx, nodes, snapshot_id)
tx.run("""
MATCH (a {snapshotId: $snapshotId}), (b {snapshotId: $snapshotId})
WHERE a.oHash < b.oHash
MERGE (a)-[r:HAS]->(b)
ON CREATE SET r.strength = 1, r.snapshotId = $snapshotId
ON MATCH SET r.strength = r.strength + 1
""", snapshotId=snapshot_id)
self._create_relationships(tx, data.get('previousMatchSnapshotNode', None), data.get('currentMatchSnapshotNode', None))
欢迎所有的输入和建议。
3条答案
按热度按时间1sbrub3j1#
密码查询似乎没问题。不确定python驱动程序是如何处理请求的,但它会不会直接发送所有cypher语句?也就是说,你经历了很长的持续时间,因为你不是在等待一个单一的网络响应,而是10 + 45 = 55个网络响应?1.4s / 55 =每个请求25 ms。另外,如果你把所有的cypher语句打印到控制台,复制所有的东西,然后在Neo4j的控制台手动执行,那么执行时间是多久?
顺便说一下,你可以使用UNWIND关键字来加载数据并迭代它。这将减少解析cypher语句所需的时间,同时减少多个小请求的开销:https://medium.com/neo4j/5-tips-tricks-for-fast-batched-updates-of-graph-structures-with-neo4j-and-cypher-73c7f693c8cc
jdzmm42g2#
这可能不是主要问题,但是
_add_complete_graph
中的MATCH
子句没有为a
或b
指定节点标签。这将阻止使用任何索引,强制扫描所有节点以查找其中的每个节点。必须指定节点标签才能使用相应的索引。
[更新]
如果您事先不知道
a
和b
的标签,则有一种解决方法。您可以向所有可能的a
和b
节点添加一个新标签(比如Snapshot
),并在:Snapshot(snapshotId)
上创建一个index或uniqueness constraint。vuktfyat3#
我已经找到解决办法了。对于每个可能偶然发现这个线程的人来说,这里有两个代码错误。
首先,我从不关闭会议。由于我一直在处理GCP功能,我完全忘记了这一点。我把它添加到类析构函数中:
第二,我读了Neo4j性能推荐文档:https://neo4j.com/docs/python-manual/current/performance/在创建驱动程序时不设置数据库名称会在执行许多查询时导致显著的开销,这就是我的情况。
当运行
EXPLAIN
时,我在分析器中发现DB查找是大多数查询的主要部分。我通常在SQL连接器中设置数据库名称,并且从不考虑它。