我的用例是识别实体,这些实体在x个实时时间段之后没有收到预期的事件,而不是使用批处理作业。例如:
如果我们在时间t收到paymentinitiated事件,但没有收到paymentfailed/paymentaborted/paymentSucked by t+x,则引发一个触发器,说明paymentstuck以及paymentinitiated事件的详细信息。
我如何在apachestorm中建模这样的用例,因为它是每个事件的滚动时间段x,而不是固定的时间间隔。
谢谢,哈里什
我的用例是识别实体,这些实体在x个实时时间段之后没有收到预期的事件,而不是使用批处理作业。例如:
如果我们在时间t收到paymentinitiated事件,但没有收到paymentfailed/paymentaborted/paymentSucked by t+x,则引发一个触发器,说明paymentstuck以及paymentinitiated事件的详细信息。
我如何在apachestorm中建模这样的用例,因为它是每个事件的滚动时间段x,而不是固定的时间间隔。
谢谢,哈里什
1条答案
按热度按时间tzcvj98z1#
对于storm,需要使用低级javaapi将所有逻辑放到udf代码中(我怀疑trindent是否有用)。我从来没有和samza一起工作过,也不能为它提供任何帮助(或者判断哪个系统更适合你的问题)。
例如,在storm中,可以为中的每个元组分配一个时间戳
Spout.nextTuple()
,并按时间戳的降序在一个bolt中缓冲未完成付款的所有元组。每次Bolt.execute()
调用时,可以将新元组的时间戳与队列的头(即最旧元组)进行比较。如果输入元组的timestamep大于head-t加x,那么就知道head元组超时了,可以触发它。当然,你需要这样做
fieldsGrouping()
确保属于同一付款的所有元组由同一个bolt示例处理。您可能还需要按时间戳对传入的bolt元组进行排序,或者使用更高级的超时逻辑来处理无序元组(关于增加时间戳)。根据您的延迟要求和输入流速率,您还可以使用“tick tuples”来触发head tuples与这个伪tick tuples的比较。或者作为一个更加严格的实现,直接在
Spout.next()
(如果您知道支付的所有元组都通过同一个spout示例)。