在Storm中按顺序处理记录

jqjz2hbq  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(334)

我是新来的风暴,我有问题,以找出如何处理记录的顺序。
我有一个数据集,其中包含具有以下字段的记录:
用户id、位置id、检查时间
现在,我想确定已经完成我指定的路径的用户(例如,从位置a到位置b再到位置c的用户)。
我使用Kafka生产者,并从一个文件中读取这些记录,以模拟实时数据。数据按日期排序。
所以,为了检查我的模式是否满足,我需要按顺序处理记录。问题是,由于并行化(bolt复制),我没有按顺序获得用户的签入。因为这种模式行不通。
如何克服这个问题?如何按顺序处理记录?

vsdwdz23

vsdwdz231#

storm支持这个用例。为此,您只需确保在所有相关组件的整个流程中保持秩序。因此,作为第一步,在kafka producer中,特定用户id的所有消息都应转到kafka中的同一分区。为此,您可以在kafkaproducer中实现一个自定义分区器。有关实施的详细信息,请参阅此处的链接。
由于kafka中的一个分区只能被storm中的一个kafkaspout示例读取,因此该分区中的消息在spout示例中按顺序排列。从而确保相同用户id的所有消息到达相同的出口。
现在是棘手的部分-要维护bolt中的顺序,您需要确保基于kafka喷口发出的“user\u id”字段对bolt使用字段分组。如果提供的kafkaspout不中断message to emit字段,则必须重写kafkaspout以读取消息并从喷口发出“user\u id”字段。一种方法是使用一个中间螺栓从kafkaspout读取消息并发出一个带有“user\u id”字段的流。
当您最后在“user\u id”上指定一个具有字段分组的bolt时,特定user\u id值的所有消息都将转到该bolt的同一个示例,无论该bolt的并行度如何。
适用于您的案例的示例拓扑如下-
生成器设置输出(“kafkaspout”,kafkaspout);
builder.setbolt(“fieldsemitterbolt”,fieldsemitterbolt).shufflegrouping(“kafkaspout”);
builder.setbolt(“calculatorbolt”,calculatorbolt).fieldsgrouping(“fieldsemitterbolt”,new fields(“user\u id”))//bolt2发出的用户id字段
--注意,如果用户id的数量有限,则可能会出现所有用户id值都到达同一calculatorbolt示例的情况。这反过来又会降低有效的“并行性”!

fnx2tebb

fnx2tebb2#

storm中的有序处理没有通用的系统支持。要么您使用一个不同的系统来支持有序的steam处理,比如apache flink(免责声明,我是flink的提交者),要么您需要自己在bolt代码中处理它。
风暴提供的唯一支援就是使用三叉戟。您可以将特定时间段(例如一分钟)的元组放入单个批中。因此,您可以一次在一分钟内处理所有元组。然而,这只有在用例允许的情况下才有效,因为不能将不同批处理中的元组相互关联。在您的例子中,只有当您知道有一些时间点,所有用户都到达了他们的目的地(并且没有其他用户开始新的交互)时,才会出现这种情况;也就是说,你需要两个用户没有重叠的时间点(在我看来,您的用例不能满足这个需求)。
对于非系统,即基于自定义用户代码的解决方案,将有两种方法:
例如,您可以缓冲元组并在处理之前对bolt中的时间戳进行排序。要使其正常工作,需要注入标点/水印,以确保时间戳大于标点的元组不会出现在标点之后。如果从每个并行输入子流接收到标点符号,则可以安全地触发排序和处理。
另一种方法是在区域缓冲区中为每个传入的子流缓冲元组(保留子流顺序内的元组),并按顺序合并缓冲区中的元组。这样做的好处是避免了排序。但是,您需要确保每个操作符都发出有序的元组。此外,为了避免阻塞(即,如果没有可用于子流的输入),也可能需要标点符号(我实现了这个方法。请随意使用代码或根据您的需要进行调整:https://github.com/mjsax/aeolus/blob/master/queries/utils/src/main/java/de/hub/cs/dbis/aeolus/utils/timestampmerger.java)

相关问题