我正在用apache flink和neo4j(社区版)开发一个数据分析应用程序。
在这个应用程序中,flink sink必须在neo4j中保存/更新关系。
neo4j会话管理的最佳方式是什么?为什么?
首次实施:
public class MySink extends RichSinkFunction<Link> {
private DbConfiguration dbconfig;
private Driver driver;
@Override
public void open(Configuration parameters) throws Exception {
this.driver = Neo4JManager.open(this.dbconfig);
}
@Override
public void close() throws Exception {
this.driver.close();
}
@Override
public void invoke(Link link) throws Exception {
Session session = this.driver.session();
Neo4JManager.saveLink(session, link);
session.close();
}
}
第二次实施:
public class MySink extends RichSinkFunction<Link> {
private DbConfiguration dbconfig;
private Driver driver;
private Session session;
@Override
public void open(Configuration parameters) throws Exception {
this.driver = Neo4JManager.open(this.dbconfig);
this.session = driver.session();
}
@Override
public void close() throws Exception {
this.session.close();
this.driver.close();
}
@Override
public void invoke(Link link) throws Exception {
Neo4JManager.saveLink(this.session, link);
}
}
在这两种实现中,都使用了以下功能:
public class Neo4JManager {
public static Driver open(DbConfiguration dbconf) {
AuthToken auth = AuthTokens.basic(dbconf.getUsername(), dbconf.getPassword());
Config config = Config.build().withEncryptionLevel(Config.EncryptionLevel.NONE ).toConfig();
return GraphDatabase.driver(dbconf.getHostname(), auth, config);
}
public static void saveLink(Session session, Link link) {
Value params = parameters("x", link.x, "y", link.y);
session.run('CREATE (Person {id:{x}}-[FOLLOWS]->(Person {id:{y}}))'
}
}
谢谢您。
暂无答案!
目前还没有任何答案,快来回答吧!