风暴三叉戟:如何使用ipartitionedtrientspout?

gmxoilav  于 2021-06-24  发布在  Storm
关注(0)|答案(1)|浏览(301)

在我们的系统中,有多个数据生成器在共享文件系统中创建文件内容,并指示 DataSourceId 在文件名中。需要有一个公平的调度机制来读取所有源生成的文件,解析、展平和丰富(使用参考数据)文件中的数据记录,批处理丰富的记录并写入数据库。
我使用 IPartitionedTridentSpout . 拓扑结构如下所示:

TransactionalTridentEsrSpout spout
    = new TransactionalTridentEsrSpout(NUM_OF_PARTITIONS);

TridentTopology topology = new TridentTopology();
topology.newStream("FileHandlerSpout", spout)
         .each(new Fields("filename", "esr"), new Utils.PrintFilter())
         .parallelismHint(NUM_OF_PARTITIONS)
         .shuffle()
         .each(new Fields("filename", "record"), new RecordFlattenerAndEnricher(), new elds("record-enriched"))
         .each(new Fields("filename", "record-enriched"), new Utils.PrintFilter())
         .project(new Fields(record-enriched")) // pass only required 
         .parallelismHint(PARALLELISM_HINT_FOR_ESR_FLATTENER_ENRICHER)
         .shuffle()
         .aggregate(new Fields("record-enriched"), new BlockWriterToDb(), new Fields("something"))
         .each(new Fields("something"), new Utils.PrintFilter())
         .parallelismHint(PARALLELISM_HINT_FOR_GP_WRITER);

由于数据文件非常大(通常是100万条记录),我读取了小批量的10k条记录。每 transactionId 生成者 Coordinator ,我的 Emitter 发出分区中当前/下一个文件的下一个10k记录。决赛 BlockWriter 将丰富的记录聚合到缓冲区中,并在“complete”方法调用时,将缓冲区写入db。
拓扑工作正常,但我有以下问题:
这个 parallelismHint 对于 ParttionedTridentSpout ,这会影响 Emitters ,设置为分区数。这个 parallelismHint 接下来的两层 FlattenerAndEnricher 以及 BlockWriterToDb )需要设置更高的值,因为我们有很多工作要做。既然没有 groupBy 这里需要,我用 shuffle() 在所有阶段之间。当一个特定的下游螺栓死亡,三叉戟预计将调用 Emitter 使用适当的旧元数据请求它重新发出。但是自从一次洗牌发生后 Emitter 的排放物会落在多个下游螺栓上。那么,三叉戟如何调用适当的发射器进行重新发射,以便重新发射完全相同的记录呢。即使三叉戟呼叫合适的发射器 Emitter 将重新发出整个10k批,其中一些记录只会失败。storm是如何处理整个序列的,我们在这里如何设计应用程序逻辑来处理一次语义的容错性。

mo49yndu

mo49yndu1#

使用trident时,整个批处理成功,或者整个批处理失败。当一个批处理失败时,喷口应该(自动)重放整个批处理,而您将无法在发出时从它的记录中进行挑选。
为了获得精确的一次语义,您的下游逻辑/db更新应该忽略重播的项(跟踪成功更新的项的批id),或者是幂等的。

相关问题