我正在运行一个storm拓扑,它从kafka代理读取数据并写入cassandra。我的一个cassandra螺栓执行读写操作。我的键空间总是动态设置的。现在我想用连接池连接到cassandra?
我的流中有密钥空间名称。我需要动态地将数据插入适当的键空间/
1) 我尝试在中使用连接池方法获取cassandra连接 execute
方法,以便每个元组都获得一个cassandra连接。因此,经过一段时间后,我的连接达到了线程1024池的连接限制。连接超时错误。
例子:
ExecutorService pool = Executors.newFixedThreadPool(1024);
public void execute(Tuple input) {
if(input.size()>0) {
ConnectionManager cm=new ConnectionManager();
cm.keyspace = "dda400db5ef2d";
statement = cm.poolRun();
cql="select * form columnfamily where id='d78346';
}
}
2) 我试着用 prepare
方法初始化辅助进程并获取静态连接
public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {
_OutputCollector=collector;
ConnectionManager cm=new ConnectionManager();
cm.keyspace ="dda400db5ef2d"; statement = cm.poolRun();
}
public void execute(Tuple input) {
if(input.size()>0) {
cql="select * form columnfamily where id='d78346';
}
}
如果数据属于一个键空间,则第二种情况有效。但是我的案例数据属于不同的键空间,这里只有一个拓扑,它将标识键空间并写入该键空间。
storm中是否有可用的哈希方法来保存键空间连接?
或
还有其他逻辑吗?
2条答案
按热度按时间vfhzx4xs1#
我不熟悉Cassandra,但我想这就是你想要的:
prepare方法在拓扑的开始处运行一次。可以使用它初始化变量/连接,并在execute方法中运行它们。您可能希望在查询完成后关闭连接,并在下一次查询时重置连接。希望这有帮助
gywdnpxw2#
做以下事情-
1) 不只是cassandra集群,而是在bolt的prepare方法中创建一个会话。创建会话时不要使用keyspace。
2) 现在根据数据创建查询。在这里,您应该在keyspace名称前面加上表名。