spark流媒体重播

bihw5rsg  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(339)

我有一个spark流应用程序来分析来自kafka代理的事件。我有如下规则,可以通过合并现有规则来生成新规则:

If this event type occurs raise an alert.
If this event type occurs more than 3 times in a 5-minute interval, raise an alert.

同时,我将所有传入的数据保存到cassandra。我喜欢做的是运行这个流应用程序从Cassandra的历史数据。例如,

<This rule> would have generated <these> alerts for <last week>.

在spark或roadmap中有什么方法可以做到这一点吗?例如,apache flink具有事件时间处理。但是将现有的代码库迁移到它似乎很困难,我想通过重用现有代码来解决这个问题。

t1rydlwq

t1rydlwq1#

这是相当直接的,有一些警告。首先,这有助于从Kafka的Angular 理解这一点。
kafka管理所谓的偏移量——kafka中的每条消息相对于其在分区中的位置都有一个偏移量(分区是主题的逻辑分区。)分区中的第一条消息的偏移量为 0L ,第二个是 1L 除此之外,由于原木翻转和可能的主题压缩, 0L 并不总是分区中最早的偏移量。
首先要做的是收集所有要从头读取的分区的偏移量。下面是一个函数:

def getOffsets(consumer: SimpleConsumer, topic: String, partition: Int) : (Long,Long) = {
  val time = kafka.api.OffsetRequest.LatestTime
  val reqInfo = Map[TopicAndPartition,PartitionOffsetRequestInfo](
    (new TopicAndPartition(topic, partition)) -> (new PartitionOffsetRequestInfo(time, 1000))
  )
  val req = new kafka.javaapi.OffsetRequest(
    reqInfo, kafka.api.OffsetRequest.CurrentVersion, "test"
  )
  val resp = consumer.getOffsetsBefore(req)
  val offsets = resp.offsets(topic, partition)
  (offsets(offsets.size - 1), offsets(0))
}

你可以这样称呼它:

val (firstOffset,nextOffset) = getOffsets(consumer, "MyTopicName", 0)

关于从Kafka那里获取偏移量的所有知识,请阅读以下内容。至少可以说,这很神秘(当你完全理解第二个论点时,请告诉我 PartitionOffsetRequestInfo ,例如。)
既然你有 firstOffset 以及 lastOffset 然后使用 fromOffset 的参数 createDirectStream ,类型为: fromOffset: Map[TopicAndPartition, Long] . 你可以设定 Long /对客户的价值 firstOffset 你从哪里得到的 getOffsets() .
至于 nextOffset --当您从处理历史数据转移到新数据时,您可以使用它来确定流中的数据。如果 msg.offset == nextOffset 然后处理分区内的第一条非历史记录。
现在直接从文档中获取注意事项:
一旦上下文启动,就不能设置或添加新的流计算。
上下文一旦停止,就不能重新启动。
一个jvm中同时只能有一个streamingcontext处于活动状态。
streamingcontext上的stop()也会停止sparkcontext。要仅停止streamingcontext,请将stop()的可选参数stopsparkcontext设置为false。
只要在创建下一个streamingcontext之前停止上一个streamingcontext(不停止sparkcontext),sparkcontext就可以重新用于创建多个streamingcontext。
正是因为这些警告我才明白 nextOffset 同时 firstOffset --因此,我可以保持流向上,但改变的背景,从历史到现在的时间处理。

相关问题