有条件地消耗Kafka喷口的风暴流?

guykilcj  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(338)

我有一个场景,将json发布到kafka示例。然后我用一个Kafka喷口把水流喷射到一个螺栓上。
但现在我想添加额外的字段(称为 x )我的json消息。如果 xa 我希望它能被博尔塔吃掉,如果 xb 我希望它能被博尔特吃掉。
有没有办法根据水流的内容将水流引导到合适的螺栓上?

exdqitrt

exdqitrt1#

最简单的方法应该是添加 SplitBolt 消耗来自 KafkaSpout ,对字段求值 x ,并转发到不同的输出流:

public class SplitBolt extends BaseRichBolt {
  OutputCollector collector;

  public void prepare(...) {
    this.collector = collector;
  }

  public void execute(Tuple input) {
    Object x = ... // get field x from input
    String streamId;
    if(x == a) {
      streamId = "stream-xa";
    } else { // x == b
      streamId = "stream-xb";
    }
    collector.emit(streamId, input, input.getValues());
  }

  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    Fields schema = new Fields(...)
    declarer.declareStream("stream-xa", schema);
    declarer.declareStream("stream-xy", schema);
  }
}

构建拓扑时,需要连接 BoltA “流xa”和 BoltB “流xb”:

TopologyBuilder b = new TopologyBuilder();
b.setSpout("spout", new KafkaSpout(...));
b.setBolt("split", new SplitBolt()).shuffleGrouping("spout");
b.setBolt("boltA", new BoltA()).shuffleGrouping("split", "stream-xa");
b.setBolt("boltB", new BoltB()).shuffleGrouping("split", "stream-xb");

作为另一种选择,它也应该是可能的,继承自 KafkaSpout 直接发射到两个不同的流。然而,代码要想正确就更难了。

相关问题