我正在运行一个storm拓扑,它将数据写入cassandra db,每隔15分钟它就会将数据从cassandra复制到mongodb Tick-Tuple
为此,当我启动拓扑时,它会一直正常运行到前15分钟,然后将数据写入cassandra,之后所有数据都会传输到mongodb并从cassandra中删除。但是,在那之后,数据就不会被写入Cassandra。虽然我会把所有的数据放进我的execute方法中,但是它不会抛出任何错误,我也检查了日志,没有错误。
这是将数据插入cassandra的execute方法:
public void execute(Tuple input, BasicOutputCollector collector) {
String term = input.getString(0);
int year = input.getInteger(1);
int month = input.getInteger(2);
int day = input.getInteger(3);
int hour = input.getInteger(4);
int dayofyear = input.getInteger(5);
int weekofyear = input.getInteger(6);
int productcount = input.getInteger(7);
String sessionid = input.getString(8);
/**
* Inserting Values In Cassandra
*/
BoundStatement insertUpdateTable = new BoundStatement(pStatement);
insertUpdateTable.bind(sessionid, term, year, month, day, hour,
dayofyear, weekofyear, productcount);
session.executeAsync(insertUpdateTable);
LOG.info(
"Inserted data into table topquery:{},{},{},{},{},{},{},{},{} ",
term, year, month, day, hour, dayofyear, weekofyear,
productcount, sessionid);
}
这是记号元组:
public void prepare(Map stormConf, TopologyContext context) {
KEYSPACE = stormConf.get(ApplicationConstants._CASSANDRA_KEYSPACE);
LOG.info("KEYSPACE AT PREPARE METHOD IN TickTuple : {}", KEYSPACE);
}
@Override
public Map<String, Object> getComponentConfiguration() {
// configure how often a tick tuple will be sent to our bolt
Config conf = new Config();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, TICK_TUPLE_FREQ_SECS);
return conf;
}
protected static boolean isTickTuple(Tuple tuple) {
return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
&& tuple.getSourceStreamId().equals(
Constants.SYSTEM_TICK_STREAM_ID);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
try {
if (isTickTuple(input)) {
ArrayList<String> sessionid = new ArrayList<String>();
/**
* Returns the List of Inactive SessionID.
*/
sessionid = InactiveSessionIdTimeStampCassandra
.returnInactiveSessionId(KEYSPACE.toString());
LOG.info("sessionid in TickTuple METHOD IN TickTuple : {}",
sessionid);
/**
* Exports the Data from Cassandra Tables to MongoDB based on
* the SessionID returned by the
* InactiveSessionIdTimeStampCassandra
* .returnInactiveSessionId().
*/
CassExport.cassExp(KEYSPACE.toString(),
TABLE_CASSANDRA_TOP_QUERY, KEYSPACE.toString(),
MONGO_COLLECTION_TopQuery, sessionid);
LOG.info("cassEport in TickTuple METHOD IN TickTuple : {}",
KEYSPACE.toString());
/**
* Deletes the Data from the Cassandra Tables based on the
* SessionID returned by the InactiveSessionIdTimeStampCassandra
* .returnInactiveSessionId().
*/
TruncateCassandraTable.truncateData(TABLE_CASSANDRA_TOP_QUERY,
sessionid, KEYSPACE.toString());
LOG.info("In Truncate");
//return;
}
} catch (Exception e) {
LOG.error("Exception in TickTuple", e);
}
}
我无法找出原因。
暂无答案!
目前还没有任何答案,快来回答吧!