如何将akka流kafka(React式kafka)集成到akka http应用程序中?

vddsk6oq  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(453)

我有一个基本的scalaakkahttpcrud应用程序。相关课程见下文。
我只想在创建/更新实体时将实体id和一些数据(如json)写入kafka主题。
我在看http://doc.akka.io/docs/akka-stream-kafka/current/producer.html,但我对scala和akka是新手,不确定如何将其集成到我的应用程序中?
例如,从上面的文档来看,这是一个制作人写给Kafka的例子,所以我想我需要一些类似的东西,但是在我的应用程序中这个应该去哪里呢?在创建用户之后,是否可以在服务的create方法中添加另一个map调用?
非常感谢!

val done = Source(1 to 100)
  .map(_.toString)
  .map { elem =>
    new ProducerRecord[Array[Byte], String]("topic1", elem)
  }
  .runWith(Producer.plainSink(producerSettings))

或者我需要做一些类似的例子吗https://github.com/hseeberger/accessus 在my server.scala的bindandhandle()方法中?
Web服务器.scala

object System {

  implicit val system = ActorSystem()
  implicit val dispatcher = system.dispatcher
  implicit val actorMaterializer = ActorMaterializer()

}

object WebServer extends App {

  import System._

  val config = new ApplicationConfig() with ConfigLoader
  ConfigurationFactory.setConfigurationFactory(new LoggingConfFileConfigurationFactory(config.loggingConfig))
  val injector = Guice.createInjector(new MyAppModule(config))
  val routes = injector.getInstance(classOf[Router]).routes
  Http().bindAndHandle(routes, config.httpConfig.interface, config.httpConfig.port)

}

路由器.scala

def routes(): Route = {
    post {
      entity(as[User]) { user =>
        val createUser = userService.create(user)
        onSuccess(createUser) {
          case Invalid(y: NonEmptyList[Err]) =>  {
            throw new ValidationException(y)
          }
          case Valid(u: User) => {
              complete(ToResponseMarshallable((StatusCodes.Created, u)))
          }
        }
      }
    } ~
    // More routes here, left out for example  
}

服务.scala

def create(user: User): Future[MaybeValid[User]] = {
    for {
      validating <- userValidation.validateCreate(user)
      result <- validating match {
        case Valid(x: User) =>
          userRepo.create(x)
            .map(dbUser => Valid(UserConverters.fromUserRow(x)))
        case y: DefInvalid =>
          Future{y}
      }
    } yield result
  }

回购scala

def create(user: User): Future[User] = {
    mutateDbProvider.db.run(
      userTable returning userTable.map(_.userId)
        into ((user, id) => user.copy(userId = id)) +=
        user.copy(createdDate = Some(Timestamp.valueOf(LocalDateTime.now())))
    )
  }
vulvrdjw

vulvrdjw1#

既然你写了你的 Route 只需要1个 UserEntity 我觉得你不需要 Producer.plainSink . 相反,我认为 Producer.send 也可以。另外,顺便说一句,抛出异常并不是scala的“惯用用法”。所以我改变了无效用户的逻辑:

val producer : KafkaProducer = new KafkaProducer(producerSettings)

val routes : Route = 
  post {
    entity(as[User]) { user =>
      val createUser = userService.create(user)
      onSuccess(createUser) {
        case Invalid(y: NonEmptyList[Err]) =>  
          complete(BadRequest -> "invalid user")
        case Valid(u: User) => { 
          val producerRecord = 
            new ProducerRecord[Array[Byte], String]("topic1",s"""{"userId" : ${u.userId}, "entity" : "User"}""")

          onComplete(producer send producerRecord) { _ =>
            complete(ToResponseMarshallable((StatusCodes.Created, u)))
          }
        }
      }
    }
  }

相关问题