storm:tick tuple产生意外结果

m4pnthwp  于 2021-06-21  发布在  Storm
关注(0)|答案(0)|浏览(172)

我正在运行一个storm拓扑,它将数据写入cassandra db,每隔15分钟它就会将数据从cassandra复制到mongodb Tick-Tuple 为此,当我启动拓扑时,它会一直正常运行到前15分钟,然后将数据写入cassandra,之后所有数据都会传输到mongodb并从cassandra中删除。但是,在那之后,数据就不会被写入Cassandra。虽然我会把所有的数据放进我的execute方法中,但是它不会抛出任何错误,我也检查了日志,没有错误。
这是将数据插入cassandra的execute方法:

public void execute(Tuple input, BasicOutputCollector collector) {

    String term = input.getString(0);
    int year = input.getInteger(1);
    int month = input.getInteger(2);
    int day = input.getInteger(3);
    int hour = input.getInteger(4);
    int dayofyear = input.getInteger(5);
    int weekofyear = input.getInteger(6);
    int productcount = input.getInteger(7);
    String sessionid = input.getString(8);

    /**
     * Inserting Values In Cassandra
     */

    BoundStatement insertUpdateTable = new BoundStatement(pStatement);
    insertUpdateTable.bind(sessionid, term, year, month, day, hour,
            dayofyear, weekofyear, productcount);

    session.executeAsync(insertUpdateTable);

    LOG.info(
            "Inserted data into table topquery:{},{},{},{},{},{},{},{},{} ",
            term, year, month, day, hour, dayofyear, weekofyear,
            productcount, sessionid);

}

这是记号元组:

public void prepare(Map stormConf, TopologyContext context) {

    KEYSPACE = stormConf.get(ApplicationConstants._CASSANDRA_KEYSPACE);
    LOG.info("KEYSPACE AT PREPARE METHOD IN TickTuple : {}", KEYSPACE);

}

@Override
public Map<String, Object> getComponentConfiguration() {
    // configure how often a tick tuple will be sent to our bolt
    Config conf = new Config();
    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, TICK_TUPLE_FREQ_SECS);
    return conf;
}

protected static boolean isTickTuple(Tuple tuple) {
    return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
            && tuple.getSourceStreamId().equals(
                    Constants.SYSTEM_TICK_STREAM_ID);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}

@Override
public void execute(Tuple input, BasicOutputCollector collector) {

    try {
        if (isTickTuple(input)) {

            ArrayList<String> sessionid = new ArrayList<String>();
            /**
             * Returns the List of Inactive SessionID.
             */
            sessionid = InactiveSessionIdTimeStampCassandra
                    .returnInactiveSessionId(KEYSPACE.toString());
            LOG.info("sessionid in TickTuple METHOD IN TickTuple : {}",
                    sessionid);
            /**
             * Exports the Data from Cassandra Tables to MongoDB based on
             * the SessionID returned by the
             * InactiveSessionIdTimeStampCassandra
             * .returnInactiveSessionId().
             */
            CassExport.cassExp(KEYSPACE.toString(),
                    TABLE_CASSANDRA_TOP_QUERY, KEYSPACE.toString(),
                    MONGO_COLLECTION_TopQuery, sessionid);
            LOG.info("cassEport in TickTuple METHOD IN TickTuple : {}",
                    KEYSPACE.toString());
            /**
             * Deletes the Data from the Cassandra Tables based on the
             * SessionID returned by the InactiveSessionIdTimeStampCassandra
             * .returnInactiveSessionId().
             */
            TruncateCassandraTable.truncateData(TABLE_CASSANDRA_TOP_QUERY,
                    sessionid, KEYSPACE.toString());

            LOG.info("In Truncate");
            //return;
        }
    } catch (Exception e) {
        LOG.error("Exception in TickTuple", e);
    }

}

我无法找出原因。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题