如何在persistorbolt中配置正确的并行性?

yizd12fk  于 2021-06-24  发布在  Storm
关注(0)|答案(0)|浏览(238)

我正在使用apachestorm创建一个拓扑,它首先读取文件中的元组“流”,然后将元组拆分并存储在mongodb中。
我在atlas上有一个集群和一个共享副本集。我已经开发了拓扑,如果我使用一个线程,解决方案就可以正常工作。

public static StormTopology build() {
        return buildWithSpout();
    }

    public static StormTopology buildWithSpout() {
        Config config = new Config();
        TopologyBuilder builder = new TopologyBuilder();

        CsvSpout datasetSpout = new CsvSpout("file.txt");
        SplitterBolt splitterBolt = new SplitterBolt(",");
        PartitionMongoInsertBolt insertPartitionBolt = new PartitionMongoInsertBolt();

        builder.setSpout(DATA_SPOUT_ID, datasetSpout, 1);
        builder.setBolt(DEPENDENCY_SPLITTER_ID, splitterBolt, 1).shuffleGrouping(DATA_SPOUT_ID);
        builder.setBolt(UPDATER_COUNTER_ID, insertPartitionBolt, 1).shuffleGrouping(DEPENDENCY_SPLITTER_ID);
    }

但是,当我使用并行进程时,我的persistor bolt不会保存mongodb中的所有元组,尽管元组是由前一个bolt正确发出的。

builder.setSpout(DATA_SPOUT_ID, datasetSpout, 1);
        builder.setBolt(DEPENDENCY_SPLITTER_ID, splitterBolt, 3).shuffleGrouping(DATA_SPOUT_ID);
        builder.setBolt(UPDATER_COUNTER_ID, insertPartitionBolt, 3).shuffleGrouping(DEPENDENCY_SPLITTER_ID);

这是我的第一个插销:

public class SplitterBolt extends BaseBasicBolt {
    private String del;
    private MongoConnector db = null;

    public SplitterBolt(String del) {
        this.del = del;
    }

    public void prepare(Map stormConf, TopologyContext context) {
        db = MongoConnector.getInstance();
    }

    public void execute(Tuple input, BasicOutputCollector collector) {
        String tuple = input.getStringByField("tuple");
        int idTuple = Integer.parseInt(input.getStringByField("id"));

        String opString = "";
        String[] data = tuple.split(this.del);
        for(int i=0; i < data.length; i++) {
            OpenBitSet attrs = new OpenBitSet();
            attrs.fastSet(i);
            opString = Utility.toStringOpenBitSet(attrs, 5);
            collector.emit(new Values(idTuple, opString, data[i]));
        }
        db.incrementCount();
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("idtuple","binaryattr","value"));
    }
}

这是我的persistor bolt,它存储在mongo所有元组中:

public class PartitionMongoInsertBolt extends BaseBasicBolt {
    private MongoConnector mongodb = null;

    public void prepare(Map stormConf, TopologyContext context) {
        //Singleton Instance
        mongodb = MongoConnector.getInstance();
    }

    public void execute(Tuple input, BasicOutputCollector collector) {
        mongodb.insertUpdateTuple(input);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {}
}

我唯一的疑问是,我对mongo的连接类使用了单例模式。这会有问题吗?
更新
这是我的mongoconnector类:

public class MongoConnector {
    private MongoClient mongoClient = null;
    private MongoDatabase database = null;
    private MongoCollection<Document> partitionCollection = null;

    private static MongoConnector mongoInstance = null;

    public MongoConnector() {
        MongoClientURI uri = new MongoClientURI("connection string");
        this.mongoClient = new MongoClient(uri);
        this.database = mongoClient.getDatabase("db.database");
        this.partitionCollection = database.getCollection("db.collection");
    }

    public static MongoConnector getInstance() {
        if (mongoInstance == null)
            mongoInstance = new MongoConnector();
        return mongoInstance;
    }

    public void insertUpdateTuple2(Tuple tuple) {
        int idTuple = (Integer) tuple.getValue(0);
        String attrs = (String) tuple.getValue(1);
        String value = (String) tuple.getValue(2);
        value = value.replace('.', ',');

        Bson query = Filters.eq("_id", attrs);
        Document docIterator = this.partitionCollection.find(query).first();

        if (docIterator != null) { 
            Bson newValue = new Document(value, idTuple);
            Bson updateDocument = new Document("$push", newValue);
            this.partitionCollection.updateOne(docIterator, updateDocument);
        } else { 
            Document document = new Document();
            document.put("_id", attrs);
            ArrayList<Integer> partition = new ArrayList<Integer>();
            partition.add(idTuple);
            document.put(value, partition);
            this.partitionCollection.insertOne(document);
        }
    }
}

解决方案更新
我已经解决了链接这条线的问题:

this.partitionCollection.updateOne(docIterator, updateDocument);

在里面

this.partitionCollection.findOneAndUpdate(query, updateDocument);

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题