python知识图谱-利用py2neo实现neo4j的dao类

py2neo的基本使用以及cypher的基本使用:https://blog.csdn.net/Akun_2217/article/details/135445147?spm=1001.2014.3001.5502

1. dao类需要实现的功能

neo4j实现dao类的基本单位就是子图,包括单个节点、单个子图,都属于子图。

数据库的创建 创建子图 查询子图 更改子图 删除子图 事务功能

一次性插入多条数据时,要么都成功要么都失败

2. dao类具体设计

日志模块实现:https://blog.csdn.net/Akun_2217/article/details/135442731?spm=1001.2014.3001.5502

初始化对象进行连接:_init_执行非查询cypher语句: execute执行查询cypher语句: query切换数据库: select_db开启事务: begin_transaction事务执行: transaction_execute提交事务: commit_transaction回滚事务: rollback_transaction关闭连接 : close_connect日志记录

3. 利用py2neo实现neo4j的dao类

实际工作中使用时,将neo4j日志路径以及连接参数,卸载一个配置文件中。 cypher语言使用两种形式, 以match为例,其他类似:

run(“match (n:boss) where n.name=‘biden’ set n.name=$newname”, **{‘newname’:‘aoguanhai’})run(“match (n:boss) where n.name=‘biden’ set n.name=$newname”, newname=‘auguanhai’)

import py2neo

from utils import logger_utils

logger = logger_utils.get_logger('./')

class Neo4jDao:

def __init__(self, url, auth):

try:

self.graph_server = py2neo.GraphService(url, auth=auth)

logger.info(f'服务连接成功,主机地址:{url}....')

except Exception as e:

logger.info(f'服务连接失败,原因:{e}, 主机地址:{url}')

# 使用默认数据库

self.graph_server = None

self.graph = None

self.tx = None

def select_db(self, db_name):

if self.graph_server:

if db_name in self.graph_server.keys():

self.graph = self.graph_server[db_name]

self.tx = None

logger.info(f'已经切换数据库,数据库切换为:{db_name}, 当前事务清零')

else:

logger.error(f'切换数据库失败, 数据库{db_name}不存在')

else:

logger.error(f'数据库切换失败,原因:数据服务未连接...')

def execute(self, cypher, **args):

'''自动提交-执行非查询cypher,无返回结果'''

try:

self.graph.run(cypher, args)

logger.info(f'自动提价执行-{cypher}执行完成。。。')

except Exception as e:

if not self.graph:

logger.error(f'自动提交执行失败-{cypher}执行失败,原因:未选择数据库')

else:

logger.error(f'自动提交执行失败-{cypher}执行失败,原因: {e}')

def query(self, cypher, **args):

try:

cursor = self.graph.run(cypher, args)

logger.info(f'查询成功-{cypher}执行成功')

return cursor

except Exception as e:

if not self.graph:

logger.error(f'查询执行失败-{cypher}执行失败,原因:未选择数据库')

else:

logger.error(f'查询执行失败-{cypher}执行失败,原因: {e}')

def transaction_execute(self, cypher, **args):

'''非自动提交事务执行非查询语句'''

try:

self.tx.run(cypher, args)

logger.info(f'事务执行成功, {cypher}执行成功')

except Exception as e:

if not self.tx:

logger.error(f'事务执行失败,{cypher}执行失败,原因:未开启事务')

else:

logger.error(f'事务执行失败,{cypher}执行失败,原因: {e}')

def begin_transaction(self):

if self.tx:

logger.warning('开启事务冲突,当前连接已经开启事务')

logger.warning('关闭当前连接的事务')

self.tx = None

try:

self.tx = self.graph.begin()

logger.info('事务已经开启...')

except Exception as e:

if not self.graph:

logger.error(f'事务开启失败,原因:未选择数据库')

else:

logger.error(f'事务开启失败,原因: {e}')

def rollback_transaction(self):

try:

self.graph.rollback(self.tx)

logger.info('事务已经回滚...')

except Exception as e:

logger.error(f'事务回滚失败, 原因{e}')

def commit_transaction(self):

try:

self.graph.commit(self.tx)

self.tx = None

logger.info('事务提交已经完成...事务结束')

except Exception as e:

logger.error(f'事务提交失败, 原因{e}')

def close_connect(self):

self.graph_server = None

logger.info('neo4j服务连接诶已经关闭...')

if __name__ == '__main__':

neo4j = Neo4jDao("bolt://localhost:7687", auth=("neo4j", "a86xavwahgwwy@@@"))

neo4j.select_db('neo4j')

record = neo4j.query('match (n) return n')

print(record.to_data_frame())

精彩文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: