如何将cassandra集群连接从一个螺栓传递到另一个螺栓

kiz8lqtg  于 2021-06-21  发布在  Storm
关注(0)|答案(1)|浏览(313)

storm拓扑从kafka读取数据并写入cassandra表
在storm中,我正在prepare方法中创建cassandra集群连接和会话。

cassandraCluster = Cluster.builder().withoutJMXReporting().withoutMetrics()
            .addContactPoints(nodes)
            .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
            .withReconnectionPolicy(new ExponentialReconnectionPolicy(100L,
                    TimeUnit.MINUTES.toMillis(5)))
            .withLoadBalancingPolicy(
                    new TokenAwarePolicy(new RoundRobinPolicy()))
            .build();

session = cassandraCluster.connect(keyspace);

在execute方法中,我可以处理元组并将其保存在cassandra表中
假设如果我想将数据从一个元组写入多个表,那么为每个表编写单独的bolt将是一个不错的选择。但是我必须在每个螺栓中创建集群连接和会话表。
但是在这个链接中,每个集群一个连接将是提高性能的好主意http://www.datastax.com/dev/blog/4-simple-rules-when-using-the-datastax-drivers-for-cassandra
你们有没有想过在一个螺栓中创建群集连接并在另一个螺栓中使用此连接?

yi0zb3m4

yi0zb3m41#

这取决于风暴如何分配螺栓和喷口的工人。您不能假设可以共享螺栓之间的连接,因为它们可能运行在不同的worker(读:jvms)中,或者完全运行在不同的节点上。
在这里看到我的答案:mongo连接池风暴拓扑
可能类似于以下伪代码:

public class CassandraBolt extends BaseRichBolt {
    private static final long serialVersionUID = 1L;
    private static Logger LOG = LoggerFactory.getLogger(CassandraBolt.class);
   OutputCollector _collector;

   // whatever your cassandra session is
   // has to be transient because session is not serializable
   protected transient CassandraSession _session;

    @SuppressWarnings("rawtypes")
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        _collector = collector;

      // maybe get properties from stormConf instead of hard coding them
        cassandraCluster = Cluster.builder().withoutJMXReporting().withoutMetrics()
            .addContactPoints(nodes)
            .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
            .withReconnectionPolicy(new ExponentialReconnectionPolicy(100L,
                    TimeUnit.MINUTES.toMillis(5)))
            .withLoadBalancingPolicy(
                    new TokenAwarePolicy(new RoundRobinPolicy()))
            .build();

      _session = cassandraCluster.connect(keyspace);
    }

    @Override
    public void execute(Tuple input) {
        try {
            // use _session to talk to cassandra

        } catch (Exception e) {
            LOG.error("CassandraBolt error", e);
            _collector.reportError(e);
        }   
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub
    }
}

相关问题