最简单的方法应该是添加 SplitBolt 消耗来自 KafkaSpout ,对字段求值 x ,并转发到不同的输出流:
public class SplitBolt extends BaseRichBolt {
OutputCollector collector;
public void prepare(...) {
this.collector = collector;
}
public void execute(Tuple input) {
Object x = ... // get field x from input
String streamId;
if(x == a) {
streamId = "stream-xa";
} else { // x == b
streamId = "stream-xb";
}
collector.emit(streamId, input, input.getValues());
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
Fields schema = new Fields(...)
declarer.declareStream("stream-xa", schema);
declarer.declareStream("stream-xy", schema);
}
}
构建拓扑时,需要连接 BoltA “流xa”和 BoltB “流xb”:
TopologyBuilder b = new TopologyBuilder();
b.setSpout("spout", new KafkaSpout(...));
b.setBolt("split", new SplitBolt()).shuffleGrouping("spout");
b.setBolt("boltA", new BoltA()).shuffleGrouping("split", "stream-xa");
b.setBolt("boltB", new BoltB()).shuffleGrouping("split", "stream-xb");
1条答案
按热度按时间exdqitrt1#
最简单的方法应该是添加
SplitBolt
消耗来自KafkaSpout
,对字段求值x
,并转发到不同的输出流:构建拓扑时,需要连接
BoltA
“流xa”和BoltB
“流xb”:作为另一种选择,它也应该是可能的,继承自
KafkaSpout
直接发射到两个不同的流。然而,代码要想正确就更难了。