我有一个spark流应用程序来分析来自kafka代理的事件。我有如下规则,可以通过合并现有规则来生成新规则:
If this event type occurs raise an alert.
If this event type occurs more than 3 times in a 5-minute interval, raise an alert.
同时,我将所有传入的数据保存到cassandra。我喜欢做的是运行这个流应用程序从Cassandra的历史数据。例如,
<This rule> would have generated <these> alerts for <last week>.
在spark或roadmap中有什么方法可以做到这一点吗?例如,apache flink具有事件时间处理。但是将现有的代码库迁移到它似乎很困难,我想通过重用现有代码来解决这个问题。
1条答案
按热度按时间t1rydlwq1#
这是相当直接的,有一些警告。首先,这有助于从Kafka的Angular 理解这一点。
kafka管理所谓的偏移量——kafka中的每条消息相对于其在分区中的位置都有一个偏移量(分区是主题的逻辑分区。)分区中的第一条消息的偏移量为
0L
,第二个是1L
除此之外,由于原木翻转和可能的主题压缩,0L
并不总是分区中最早的偏移量。首先要做的是收集所有要从头读取的分区的偏移量。下面是一个函数:
你可以这样称呼它:
关于从Kafka那里获取偏移量的所有知识,请阅读以下内容。至少可以说,这很神秘(当你完全理解第二个论点时,请告诉我
PartitionOffsetRequestInfo
,例如。)既然你有
firstOffset
以及lastOffset
然后使用fromOffset
的参数createDirectStream
,类型为:fromOffset: Map[TopicAndPartition, Long]
. 你可以设定Long
/对客户的价值firstOffset
你从哪里得到的getOffsets()
.至于
nextOffset
--当您从处理历史数据转移到新数据时,您可以使用它来确定流中的数据。如果msg.offset == nextOffset
然后处理分区内的第一条非历史记录。现在直接从文档中获取注意事项:
一旦上下文启动,就不能设置或添加新的流计算。
上下文一旦停止,就不能重新启动。
一个jvm中同时只能有一个streamingcontext处于活动状态。
streamingcontext上的stop()也会停止sparkcontext。要仅停止streamingcontext,请将stop()的可选参数stopsparkcontext设置为false。
只要在创建下一个streamingcontext之前停止上一个streamingcontext(不停止sparkcontext),sparkcontext就可以重新用于创建多个streamingcontext。
正是因为这些警告我才明白
nextOffset
同时firstOffset
--因此,我可以保持流向上,但改变的背景,从历史到现在的时间处理。