apache trident运算符不总是执行

nwsw7zdq  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(232)

首先,我对风暴/三叉戟有点陌生,我已经为一个问题挣扎了好几个小时了。
我有一个Kafka主题和一个分区。生产者每x毫秒向该主题发送一个元组。TransactionalTridentKafkasOut读取此主题,一些trident操作符处理它们。整个拓扑在本地模式下运行(远程模式到目前为止还没有测试)。
拓扑的主要代码是:

TransactionalTridentKafkaSpout spout = new TransactionalTridentKafkaSpout(spoutConf);
TridentTopology topology = new TridentTopology();
Stream inStream = topology.newStream("kafka-spout", spout).parallelismHint(4);

TridentState state1=inStream
    .groupBy(new Fields(ID_FIELD))
    .persistentAggregate(new MemoryMapState.Factory(), new Fields(ID_FIELD, FIELD1, FIELD2, FIELD3), new CustomCombinerAgg1(), new Fields(COMB_AGG_1_FIELD))
    .parallelismHint(4);

state1.newValuesStream().groupBy(new Fields(ID_FIELD)).
    persistentAggregate(new MemoryMapState.Factory(), new Fields(ID_FIELD, COMB_AGG_1_FIELD), new CustomCombinerAgg2(), new Fields(COMB_AGG_2_FIELD))
    .parallelismHint(4);

state1.newValuesStream().filter(new Fields(ID_FIELD, COMB_AGG_1_FIELD), new CustomBaseFilter1());

inStream.groupBy(new Fields(ID_FIELD))
    .persistentAggregate(new MemoryMapState.Factory(), new Fields(ID_FIELD, FIELD1, FIELD2), new CustomCombinerAgg3(), new Fields(COMB_AGG_3_FIELD));

inStream.groupBy(new Fields(ID_FIELD))
    .persistentAggregate(new MemoryMapState.Factory(), new Fields(ID_FIELD, FIELD1, FIELD2, FIELD3), new CustomCombinerAgg4(), new Fields(COMB_AGG_4_FIELD))
    .newValuesStream().filter(new Fields(ID_FIELD, COMB_AGG_4_FIELD), new CustomBaseFilter2());

现在我遇到的问题是,生产者的消息间隔越低,执行的一些操作符就越少。
例如,如果生产者以100毫秒的间隔发送200个元组,则每个运算符都正确处理所有200个元组,但如果将间隔设置为20毫秒,则例如,运算符仅处理以下数量的元组:
客户组合agg1:200
客户组合agg2:50
custombasefilter1:50个
客户组合agg3:150
客户组合arg4:180
custombasefilter2:60个
据我所知(事务性)trident保证只进行一次处理,并且只有在前一个元组被完全处理之后,才能从喷口提取新的一批元组。这里的情况似乎不是这样的,看起来第一个操作符customcombineragg1决定了速度,接下来的操作符不能在给定的时间内处理所有元组?
我所期望的是,每个操作符对每个元组都会正确执行,一旦元组/批处理被所有操作符处理,下一个操作符就会被获取。这不是使用三叉戟的情况吗?我做错什么了吗?我怎样才能达到这种行为?
trident怎么知道元组何时被完全处理?据我所知,你必须在storm中ack()元组,但是trident操作符没有outputcollector,因此不能调用ack()?我的问题和这个有关系吗?
谢谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题