Akka流输入('In')作为输出('Out')

qxsslcnc  于 2022-11-05  发布在  其他
关注(0)|答案(1)|浏览(116)

我正在尝试写一段代码,它做以下:-
1.从远程源(如s3)读取大型csv文件。
1.逐记录处理文件。
1.向用户发送通知
1.将输出写入远程位置
输入csv中的示例记录:

recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000

表示输入csv中记录的输入case类:
case class OutputRecord(recordId: String, name: String, designation: String)
输出csv中的示例记录(需要写入):

recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager

我的输出case类,它表示输入csv中的记录:
case class OutputRecord(recordId: String, name: String, designation: String)
使用akka流csv阅读记录(使用Alpakka无功s3 https://doc.akka.io/docs/alpakka/current/s3.html):

def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] = 
S3.download(s3Object.bucket, s3Object.path)
      .runWith(Sink.head)
// This is then converted to csv

现在我有一个函数来处理记录:

def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer

将OutputRecord写入为csv的函数

def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] = 
S3.multipartUpload(s3Object.bucket,
                       s3Object.path,
                       metaHeaders = MetaHeaders(Map())

发送电子邮件通知的功能:

def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info

把它们缝合在一起

readAsCSV.flatMap { recordSource =>
  recordSource.map { record
    val outputRecord = process(record)
    outputRecord
  }
  .via(notify) //Error: Line 15
  .to(writeOutput) //Error: Line 16
  .run()
}

在第15行和第16行出现错误,我可以添加第15行或第16行,但不能同时添加这两行,因为notifywriteOutput都需要outputRecord。调用notify后,我丢失了outputRecord
是否可以将notifywriteOutput添加到同一个图表中?

我并不是在寻找并行执行,因为我想首先调用notify,然后只调用writeOutput。因此,这没有帮助:https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing
用例对我来说似乎很简单,但有些我怎么也找不到一个干净的解决方案。

sgtfey8w

sgtfey8w1#

notify的输出是一个PushResult,但是writeOutput的输入是ByteString。一旦你改变了它,它就会编译。如果你需要ByteString,从OutputRecord得到相同的。
顺便说一句,在您提供的示例代码中,readCSVprocess中也存在类似的错误。

相关问题