storm-bolt中的动态cassandra连接

hwamh0ep  于 2021-06-21  发布在  Storm
关注(0)|答案(2)|浏览(350)

我正在运行一个storm拓扑,它从kafka代理读取数据并写入cassandra。我的一个cassandra螺栓执行读写操作。我的键空间总是动态设置的。现在我想用连接池连接到cassandra?
我的流中有密钥空间名称。我需要动态地将数据插入适当的键空间/
1) 我尝试在中使用连接池方法获取cassandra连接 execute 方法,以便每个元组都获得一个cassandra连接。因此,经过一段时间后,我的连接达到了线程1024池的连接限制。连接超时错误。
例子:

ExecutorService pool = Executors.newFixedThreadPool(1024); 
public void execute(Tuple input)    {
  if(input.size()>0)        {          
     ConnectionManager cm=new ConnectionManager();     
     cm.keyspace = "dda400db5ef2d";
     statement = cm.poolRun();  
     cql="select * form columnfamily where id='d78346';
   }
}

2) 我试着用 prepare 方法初始化辅助进程并获取静态连接

public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {          
  _OutputCollector=collector;
  ConnectionManager cm=new ConnectionManager();
  cm.keyspace ="dda400db5ef2d";    statement = cm.poolRun();    
} 

public void execute(Tuple input)  { 
  if(input.size()>0) {          
    cql="select * form columnfamily where id='d78346';
  }
}

如果数据属于一个键空间,则第二种情况有效。但是我的案例数据属于不同的键空间,这里只有一个拓扑,它将标识键空间并写入该键空间。
storm中是否有可用的哈希方法来保存键空间连接?

还有其他逻辑吗?

vfhzx4xs

vfhzx4xs1#

我不熟悉Cassandra,但我想这就是你想要的:

private ConnectionManager cm = null;
public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {          
  _OutputCollector=collector;
  cm=new ConnectionManager();    
}

public void execute(Tuple input)  { 
  cm.keyspace = "dda400db5ef2d";
  statement = cm.poolRun();  
  cql="select * form columnfamily where id='d78346';
}

prepare方法在拓扑的开始处运行一次。可以使用它初始化变量/连接,并在execute方法中运行它们。您可能希望在查询完成后关闭连接,并在下一次查询时重置连接。希望这有帮助

gywdnpxw

gywdnpxw2#

做以下事情-
1) 不只是cassandra集群,而是在bolt的prepare方法中创建一个会话。创建会话时不要使用keyspace。

public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {   
    CassandraConnector client = new CassandraConnector();
    client.connect("127.0.0.1", 9142);
    this.session = client.getSession();
}

2) 现在根据数据创建查询。在这里,您应该在keyspace名称前面加上表名。

public void execute(Tuple input)  { 
  if(input.size()>0) {          
     cql="select * form keyspace.table where id='d78346'";
  }
}

相关问题