使用Akka Streams阅读CSV文件

jecbmhm3  于 2022-11-06  发布在  其他
关注(0)|答案(4)|浏览(168)

我正在阅读一个csv文件。我正在使用Akka Streams来做这个,这样我就可以创建一个在每一行上执行的动作的图形。我已经启动并运行了下面的玩具示例。

def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("MyAkkaSystem")
    implicit val materializer = ActorMaterializer()

        val source = akka.stream.scaladsl.Source.fromIterator(Source.fromFile("a.csv").getLines)
        val sink = Sink.foreach(println)
        source.runWith(sink)
      }

这两个Source类型让我很不舒服。这是惯用的,还是有更好的写法?

siotufzp

siotufzp1#

实际上,akka-streams提供了一个直接从文件中读取的函数。

FileIO.fromPath(Paths.get("a.csv"))
      .via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String))
      .runForeach(println)

这里,runForeach方法是打印行。如果你有合适的Sink来处理这些行,就用它来代替这个函数。例如,如果你想按'拆分行并打印其中的总字数:

val sink: Sink[String] = Sink.foreach(x => println(x.split(",").size))

FileIO.fromPath(Paths.get("a.csv"))
      .via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String))
      .to(sink)
      .run()
gcmastyq

gcmastyq2#

使用Akka Streams读取CSV文件的惯用方法是使用Alpakka CSV connector。以下示例读取CSV文件,将其转换为列名(假定为文件中的第一行)和ByteString值的Map,将ByteString值转换为String值,并打印每一行:

FileIO.fromPath(Paths.get("a.csv"))
  .via(CsvParsing.lineScanner())
  .via(CsvToMap.toMap())
  .map(_.mapValues(_.utf8String))
  .runForeach(println)
qvsjd97n

qvsjd97n3#

试试看:

import java.nio.file.Paths

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.util.ByteString

import scala.concurrent.Await
import scala.concurrent.duration._

object ReadStreamApp extends App {
  implicit val actorSystem = ActorSystem()
  import actorSystem.dispatcher
  implicit val flowMaterializer = ActorMaterializer()

  val logFile = Paths.get("src/main/resources/a.csv")

  val source = FileIO.fromPath(logFile)

  val flow = Framing
    .delimiter(ByteString(System.lineSeparator()), maximumFrameLength = 512, allowTruncation = true)
    .map(_.utf8String)

  val sink = Sink.foreach(println)

  source
    .via(flow)
    .runWith(sink)
    .andThen {
      case _ =>
        actorSystem.terminate()
        Await.ready(actorSystem.whenTerminated, 1 minute)
    }
}
liwlm1x9

liwlm1x94#

是的,这是可以的,因为这些是不同的Source。但是如果你不喜欢scala.io.Source,你可以自己读取文件(有时我们不得不这样做,例如,源csv文件是压缩的),然后使用给定的InputStream解析它,如下所示

StreamConverters.fromInputStream(() => input)
  .via(Framing.delimiter(ByteString("\n"), 4096))
  .map(_.utf8String)
  .collect { line =>
    line
  }

话虽如此,考虑使用Apache Commons CSV与akka-stream。您可能最终编写更少的代码:)

相关问题