如何使用storm trident对元组进行批处理?

nbysray5  于 2021-06-21  发布在  Storm
关注(0)|答案(1)|浏览(426)

我以前使用的是storm,我需要更多的批处理功能,所以我在storm中搜索批处理。我找到了三叉戟,它可以实时进行微量配料。
但不知何故,我无法弄清楚三叉戟如何处理微批处理(流、批大小、批间隔),以知道它真的有我需要的。
我想做的是收集/保存喷口在一个时间间隔内发出的元组,并用另一个时间间隔将它们重新发出到下游组件/螺栓/函数(例如,喷口每秒发出一个元组,下一个trident函数将收集/保存元组,并每分钟向下一个函数发出50个元组。)
有人能指导我如何在这种情况下使用三叉戟吗?或任何其他使用storm功能的适用方式?

5cnsuln7

5cnsuln71#

好问题!但遗憾的是,这种微量配料并没有从三叉戟的盒子里得到支持。
但您可以尝试实现自己的频率驱动的微批处理。类似这样的 backbone 示例:

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MicroBatchingBolt extends BaseRichBolt {

    private static final long serialVersionUID = 8500984730263268589L;
    private static final Logger LOG = LoggerFactory.getLogger(MicroBatchingBolt.class);

    protected LinkedBlockingQueue<Tuple> queue = new LinkedBlockingQueue<Tuple>();

    /**The threshold after which the batch should be flushed out. */
    int batchSize = 100;

    /**
     * The batch interval in sec. Minimum time between flushes if the batch sizes
     * are not met. This should typically be equal to
     * topology.tick.tuple.freq.secs and half of topology.message.timeout.secs
     */
    int batchIntervalInSec = 45;

    /**The last batch process time seconds. Used for tracking purpose */
    long lastBatchProcessTimeSeconds = 0;

    private OutputCollector collector;

    @Override
    @SuppressWarnings("rawtypes")
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
      // Check if the tuple is of type Tick Tuple
      if (isTickTuple(tuple)) {
         // If so, it is indication for batch flush. But don't flush if previous
         // flush was done very recently (either due to batch size threshold was
         // crossed or because of another tick tuple

        if ((System.currentTimeMillis() / 1000 - lastBatchProcessTimeSeconds) >= batchIntervalInSec) {
          LOG.debug("Current queue size is " + this.queue.size()
              + ". But received tick tuple so executing the batch");

          finishBatch();
        } else {
          LOG.debug("Current queue size is " + this.queue.size()
              + ". Received tick tuple but last batch was executed "
              + (System.currentTimeMillis() / 1000 - lastBatchProcessTimeSeconds)
              + " seconds back that is less than " + batchIntervalInSec
              + " so ignoring the tick tuple");
        }
      } else {
        // Add the tuple to queue. But don't ack it yet.
        this.queue.add(tuple);
        int queueSize = this.queue.size();
        LOG.debug("current queue size is " + queueSize);
        if (queueSize >= batchSize) {
          LOG.debug("Current queue size is >= " + batchSize
              + " executing the batch");

          finishBatch();
        }
      }
    }

    private boolean isTickTuple(Tuple tuple) {
        // Check if it is tick tuple here
        return false;
    }

    /**
     * Finish batch.
     */
    public void finishBatch() {

      LOG.debug("Finishing batch of size " + queue.size());
      lastBatchProcessTimeSeconds = System.currentTimeMillis() / 1000;
      List<Tuple> tuples = new ArrayList<Tuple>();
      queue.drainTo(tuples);

      for (Tuple tuple : tuples) {
        // Prepare your batch here (may it be JDBC, HBase, ElasticSearch, Solr or
        // anything else.
        // List<Response> responses = externalApi.get("...");
      }

      try {
        // Execute your batch here and ack or fail the tuples
        LOG.debug("Executed the batch. Processing responses.");
        //        for (int counter = 0; counter < responses.length; counter++) {
        //          if (response.isFailed()) {
        //            LOG.error("Failed to process tuple # " + counter);
        //            this.collector.fail(tuples.get(counter));
        //          } else {
        //            LOG.debug("Successfully processed tuple # " + counter);
        //            this.collector.ack(tuples.get(counter));
        //          }
        //        }
      } catch (Exception e) {
        LOG.error("Unable to process " + tuples.size() + " tuples", e);
        // Fail entire batch
        for (Tuple tuple : tuples) {
          this.collector.fail(tuple);
        }
      }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // ... 
    }

}

资料来源:http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/ 在Storm中用三叉戟的记号元组

相关问题