如何在akka流中只保存流的唯一值?

pnwntuvh  于 2022-12-23  发布在  其他
关注(0)|答案(1)|浏览(373)

我有一个日志文件,是饲料数据,每次访问一些网站(它实际上只是一个Python程序,模拟这一点),我想计算访问每个网站的数量。日志文件不断更新

val logFile = Paths.get("./src/main/scala/log-generator.log")
 val source =   FileTailSource(logFile,maxChunkSize = 4096,startingPosition=0L,pollingInterval = 5000.millis).via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 4096, allowTruncation = true))

// Parse each line of the log file to extract the website name
val websiteFlow = Flow[ByteString].map { line =>
      val fields = line.utf8String.split(" ")
      val website = fields(2)
      (website,1)
    }.groupBy(Int.MaxValue,_._1)
    .scan(("",0)) { case ((_, count), (website, _)) => (website, count + 1) }
    .filter{case (website, count) => count > 0}
    .map { case (website,count) => s"($website, $count)\n" }
    .map(s=>ByteString(s))
    .mergeSubstreams
val sink_website = FileIO.toPath(Paths.get("./websites.csv")) 

val f = source.via(websiteFlow).runWith(sink_website)

while (true){
Await.result(f,60.second)
}
}

我使用FileTailSource作为源和scan tp处理每一个条目,但我不太得到我想要的结果,我得到一个不断增加的计数为每个网站:是否可以更改代码,使其仅输出每个网站的最新计数?

11dmarpk

11dmarpk1#

以下是我将如何解决你的问题(我希望在此过程中回答你的问题)。

1)实施Source,在每次访问网站时发送网站:

// emits the name of a website each time is emitted
  val websitesSource: Source[String, NotUsed] = {
     // your code should work to implent this, just emit `website`
     // instead of `(website, 1)`
  }

2)实现一个Flow,用于累加Map中的计数器:

val countingFlow: Flow[String, Map[String, Int], NotUsed] =
    Flow[String]
      .scan(Map[String, Int]()) { (acc, website) =>
        val newCount = acc.getOrElse(website, 0) + 1
        acc + (website -> newCount)
      }
      // drop 1 because scan will emit the initial empty map
      .drop(1)

(Note该解决方案假设不同网站的总量足够小,使得这样的Map可以保存在存储器中。

3)实现一个Sink,每当更新统计信息时,它都会重写整个文件。

你的问题是:FileIO.toPath会在值更新时将新字符串附加到文件中,而您希望覆盖旧文件内容。我不知道在akka中是否有现成的解决方案,因此我将执行以下操作:

/** Writes the full map into a file, overwriting any existing content. */
  def writeCountsToFile(counts: Map[String, Int]): Future[Unit] = {
    // you'll figure out how to do this
    ???
  }

  // uses writeCountsToFile to write the new file content on any update
  val sink: Sink[Map[String, Int], Future[Done]] =
    Sink.foreachAsync(1)(writeCountsToFile)

4)最后:把所有的东西结合起来

对于上游源发出更新的速度比文件写入接收器写入更新的速度快的理论情况,我将插入一个缓冲区来丢弃旧的计数器Map(无论如何都会被覆盖):

websitesSource
    .via(countingFlow)
    .buffer(1, OverflowStrategy.dropTail)
    .to(sink)
    .run()

显然,我还没有用实际的文件测试过,但是我非常乐观地认为这是可行的。
顺便说一句:分离这三个组件(io相关的源和接收器+“逻辑”流)的好处在于,这允许在没有io文件的情况下测试逻辑--参见https://scastie.scala-lang.org/nrOjPHYQSzazXpxwIkPd0A以获得一些可以使用的东西。

相关问题