如何使用apachestorm tuple

aij0ehis  于 2021-06-21  发布在  Storm
关注(0)|答案(2)|浏览(359)

我刚从Apache风暴开始。我阅读了教程并查看了一些示例我的问题是,所有示例都使用非常简单的元组(通常是一个带有字符串的字段)。元组是内联创建的(使用新值(…))。在我的例子中,我有许多字段(5..100)的元组。所以我的问题是如何实现这样的元组,每个字段都有名称和类型(都是原语)?
有什么例子吗(我认为直接实现“tuple”不是一个好主意)
谢谢

lx0bsm1f

lx0bsm1f1#

用所有字段作为值创建元组的另一种方法是只创建一个bean并将其传递到元组中。
给定以下类别:

public class DataBean implements Serializable {
    private static final long serialVersionUID = 1L;

    // add more properties as necessary
    int id;
    String word;

    public DataBean(int id, String word) {
        setId(id);
        setWord(word);
    }
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getWord() {
        return word;
    }
    public void setWord(String word) {
        this.word = word;
    }
}

在一个螺栓中创建并发出数据Bean:

collector.emit(new Values(bean));

获取目标螺栓中的数据根:

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
    try {
        DataBean bean = (DataBean)tuple.getValue(0);
        // do your bolt processing with the bean
    } catch (Exception e) {
        LOG.error("WordCountBolt error", e);
        collector.reportError(e);
    }       
}

设置拓扑时,不要忘记使bean可序列化并注册:

Config stormConfig = new Config();
stormConfig.registerSerialization(DataBean.class);
// more stuff
StormSubmitter.submitTopology("MyTopologyName", stormConfig, builder.createTopology());

免责声明:bean可以很好地用于随机分组。如果你需要 fieldsGrouping ,仍应使用原语。例如,在字数计算场景中,需要按字分组,以便发出:

collector.emit(new Values(word, bean));
w9apscun

w9apscun2#

我将实现一个定制的tuple/value类型,如下所示:不是使用成员变量来存储数据,而是将每个属性Map到一个固定索引,并将其Map到继承的 Values 类型。这种方法避免了常规bean的“字段分组”问题。
不需要为字段分组添加其他属性(这很不自然)
避免了数据重复(减少了传输的字节数)
它保留了bean模式的优势
单词计数示例如下:

public class WordCountTuple extends Values {
    private final static long serialVersionUID = -4386109322233754497L;

    // attribute indexes
    /**The index of the word attribute. */
    public final static int WRD_IDX = 0;
    /**The index of the count attribute. */
    public final static int CNT_IDX = 1;

    // attribute names
    /**The name of the word attribute. */
    public final static String WRD_ATT = "word";
    /**The name of the count attribute. */
    public final static String CNT_ATT = "count";

    // required for serialization
    public WordCountTuple() {}

    public WordCountTuple(String word, int count) {
        super.add(WRD_IDX, word);
        super.add(CNT_IDX, count);
    }

    public String getWord() {
        return (String)super.get(WRD_IDX);
    }

    public void setWort(String word) {
        super.set(WRD_IDX, word);
    }

    public int getCount() {
        return (Integer)super.get(CNT_IDX);
    }

    public void setCount(int count) {
        super.set(CNT_IDX, count);
    }

    public static Fields getSchema() {
        return new Fields(WRD_ATT, CNT_ATT);
    }
}

为了避免矛盾, final static 使用“word”和“count”属性的变量。此外,一种方法 getSchema() 返回用于在spout/bolt方法中声明输出流的实现架构 .declareOutputFields(...) 对于输出元组,可以直接使用此类型:

public MyOutBolt implements IRichBolt {

    @Override
    public void execute(Tuple tuple) {
        // some more processing
        String word = ...
        int cnt = ...
        collector.emit(new WordCountTuple(word, cnt));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(WordCountTuple.getSchema());
    }

    // other methods omitted
}

对于输入元组,我建议使用以下模式:

public MyInBolt implements IRichBolt {
    // use a single instance for avoid GC trashing
    private final WordCountTuple input = new WordCountTuple();

    @Override
    public void execute(Tuple tuple) {
        this.input.clear();
        this.input.addAll(tuple.getValues());

        String word = input.getWord();
        int count = input.getCount();

        // do further processing
    }

    // other methods omitted
}
``` `MyOutBolt` 以及 `MyInBolt` 可按以下方式连接:

TopologyBuilder b = ...
b.setBolt("out", new MyOutBolt());
b.setBolt("in", new MyInBolt()).fieldsGrouping("out", WordCountTuple.WRD_ATT);

使用字段分组很简单,因为 `WordCountTuple` 允许单独访问每个属性。

相关问题