在我们的系统中,有多个数据生成器在共享文件系统中创建文件内容,并指示 DataSourceId
在文件名中。需要有一个公平的调度机制来读取所有源生成的文件,解析、展平和丰富(使用参考数据)文件中的数据记录,批处理丰富的记录并写入数据库。
我使用 IPartitionedTridentSpout
. 拓扑结构如下所示:
TransactionalTridentEsrSpout spout
= new TransactionalTridentEsrSpout(NUM_OF_PARTITIONS);
TridentTopology topology = new TridentTopology();
topology.newStream("FileHandlerSpout", spout)
.each(new Fields("filename", "esr"), new Utils.PrintFilter())
.parallelismHint(NUM_OF_PARTITIONS)
.shuffle()
.each(new Fields("filename", "record"), new RecordFlattenerAndEnricher(), new elds("record-enriched"))
.each(new Fields("filename", "record-enriched"), new Utils.PrintFilter())
.project(new Fields(record-enriched")) // pass only required
.parallelismHint(PARALLELISM_HINT_FOR_ESR_FLATTENER_ENRICHER)
.shuffle()
.aggregate(new Fields("record-enriched"), new BlockWriterToDb(), new Fields("something"))
.each(new Fields("something"), new Utils.PrintFilter())
.parallelismHint(PARALLELISM_HINT_FOR_GP_WRITER);
由于数据文件非常大(通常是100万条记录),我读取了小批量的10k条记录。每 transactionId
生成者 Coordinator
,我的 Emitter
发出分区中当前/下一个文件的下一个10k记录。决赛 BlockWriter
将丰富的记录聚合到缓冲区中,并在“complete”方法调用时,将缓冲区写入db。
拓扑工作正常,但我有以下问题:
这个 parallelismHint
对于 ParttionedTridentSpout
,这会影响 Emitters
,设置为分区数。这个 parallelismHint
接下来的两层 FlattenerAndEnricher
以及 BlockWriterToDb
)需要设置更高的值,因为我们有很多工作要做。既然没有 groupBy
这里需要,我用 shuffle()
在所有阶段之间。当一个特定的下游螺栓死亡,三叉戟预计将调用 Emitter
使用适当的旧元数据请求它重新发出。但是自从一次洗牌发生后 Emitter
的排放物会落在多个下游螺栓上。那么,三叉戟如何调用适当的发射器进行重新发射,以便重新发射完全相同的记录呢。即使三叉戟呼叫合适的发射器 Emitter
将重新发出整个10k批,其中一些记录只会失败。storm是如何处理整个序列的,我们在这里如何设计应用程序逻辑来处理一次语义的容错性。
1条答案
按热度按时间mo49yndu1#
使用trident时,整个批处理成功,或者整个批处理失败。当一个批处理失败时,喷口应该(自动)重放整个批处理,而您将无法在发出时从它的记录中进行挑选。
为了获得精确的一次语义,您的下游逻辑/db更新应该忽略重播的项(跟踪成功更新的项的批id),或者是幂等的。