akka 不建议使用IOResult方法

zazmityj  于 2023-02-08  发布在  其他
关注(0)|答案(1)|浏览(131)

我是Scala新手,正在使用方法def过程,但是遇到了构建错误,如“IOResult类中的方法wasSuccessful已被弃用(自2.6.0起):如果sftpResult.wasSuccessful =〉",则状态总是设置为Success(Done)case Success((sftpResult,updateList))。我的问题是如何在我的代码中实现它?如果这个问题看起来很愚蠢,我道歉。

override def process(changedAfter: Option[ZonedDateTime], changedBefore: Option[ZonedDateTime])
                        (implicit fc: FlowContext): Future[(IOResult, Seq[Operation.Value])] = {
      val now = nowUtc
      val accountUpdatesFromDeletions: Source[AccountUpdate, NotUsed] =
        dbService
          .getDeleteActions(before = now)
          .map(deletionEventToAccountUpdate)

      val result = getAccountUpdates(changedAfter, changedBefore)
        .merge(getErrorUpdates)
        .merge(accountUpdatesFromDeletions)
        .mapAsync(4)(writer.writePSVString)
        .viaMat(creditBureauService.sendUpdate)(Keep.right)
        .mapAsync(4)(au =>
          for {
            _ <- dbService.performUpdate(au)
            _ <- performActionsDelete(now, au)
          } yield au.operation
        )
        .toMat(Sink.seq)(Keep.both)
        .withAttributes(ActorAttributes.supervisionStrategy(decider))
        .run()

      tupleFutureToFutureTuple(result) andThen {
        case Success((sftpResult, updateList)) if sftpResult.wasSuccessful =>
          val total = updateList.size
          val deleted = updateList.count(_ == Operation.DELETED)
          val updated = updateList.count(_ == Operation.UPDATED)
          val inserted = updateList.count(_ == Operation.INSERTED)
          log.info(s"SUCCESS! Uploaded $total accounts to Equifax.")
          log.info(s"There were $deleted deletions, " +
            s"$updated updates and " +
            s"$inserted insertions to the database")
          monitor.gauge("upload.process.batch.successful.total", total)
          monitor.gauge("upload.process.batch.successful.deleted", deleted)
          monitor.gauge("upload.process.batch.successful.updated", updated)
          monitor.gauge("upload.process.batch.successful.inserted", inserted)
        case Success((sftpResult, _)) if !sftpResult.wasSuccessful =>
          val sw = new StringWriter
          sftpResult.getError.printStackTrace(new PrintWriter(sw))
          log.error("Failed to send data: " + sw.toString)
          monitor.gauge("upload.process.batch.sftp.failed", 1)
        case Failure(e: Throwable) =>
          val sw = new StringWriter
          e.printStackTrace(new PrintWriter(sw))
          log.error(sw.toString)
          monitor.gauge("upload.process.batch.failed", 1)
      }
    }
kg7wmglp

kg7wmglp1#

wasSuccessfulgetError2.6开始就被弃用了。由于IOResult通常被 Package 在Future中,而Future本身已经表明I/O操作是成功还是失败,因此成功返回的IOResultstatus参数本质上是冗余的,因此现在总是设置为Success[Done]
考虑到这一点,你可以简化/重构你的大小写匹配片段如下:

tupleFutureToFutureTuple(result) andThen {
    case Success((sftpResult, updateList)) =>
      val total = updateList.size
      val deleted = updateList.count(_ == Operation.DELETED)
      val updated = updateList.count(_ == Operation.UPDATED)
      val inserted = updateList.count(_ == Operation.INSERTED)
      log.info(s"SUCCESS! Uploaded $total accounts to Equifax.")
      log.info(s"There were $deleted deletions, " +
        s"$updated updates and " +
        s"$inserted insertions to the database")
      monitor.gauge("upload.process.batch.successful.total", total)
      monitor.gauge("upload.process.batch.successful.deleted", deleted)
      monitor.gauge("upload.process.batch.successful.updated", updated)
      monitor.gauge("upload.process.batch.successful.inserted", inserted)
    case Failure(e: Throwable) =>
      val sw = new StringWriter
      e.printStackTrace(new PrintWriter(sw))
      log.error(sw.toString)
      monitor.gauge("upload.process.batch.failed", 1)
  }

相关问题