找到java.util.concurrent.future所需的scala.concurrent.future

izkcnapc  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(538)

相关:java.util.concurrent.future的scala.concurrent.future Package 器
这来自我的另一个问题:
如何将akka流kafka(reactive kafka)集成到akka http应用程序中?
我有一个akka http应用程序,我想在我的路由中的oncomplete函数中向kafka发送一条消息/producerrecord,如下所示:

val producer : KafkaProducer = new KafkaProducer(producerSettings)

val routes : Route = 
  post {
    entity(as[User]) { user =>
      val createUser = userService.create(user)
      onSuccess(createUser) {
        case Invalid(y: NonEmptyList[Err]) =>  
          complete(BadRequest -> "invalid user")
        case Valid(u: User) => { 
          val producerRecord = 
            new ProducerRecord[Array[Byte], String]("topic1","some message")

          onComplete(producer.send(producerRecord)) { _ =>
            complete(ToResponseMarshallable((StatusCodes.Created, u)))
          }
        }
      }
    }
  }

但是,oncomplete(producer send producerrecord)正在生成以下类型不匹配错误:
[错误]找到:future[org.apache.kafka.clients.producer.recordmetadata](在java.util.concurrent中)[错误]必需:future[org.apache.kafka.clients.producer.recordmetadata](在scala.concurrent中)[错误]oncompleterecordmetadata{
有没有办法解决这个问题,也许把制作人当Flume(http://doc.akka.io/docs/akka-stream-kafka/current/producer.html#producer-作为接收器)而不是java producer.send函数?

vsikbqxv

vsikbqxv1#

为了回答您的具体问题,scala-java8-compat库提供了java8和scala之间的转换器。
具体来说,你可以使用 FutureConverters.toScala(producer.send(producerRecord)) 转换 java.util.concurrent.Futurescala.concurrent.Future 但是,使用一个本身具有scala友好api的客户机库(正如上面stefano所建议的那样)可能会得到最好的结果。

3yhwsihp

3yhwsihp2#

您可以利用cake的基于scala的kafka客户端,它将运行java futures并返回scala futures。一旦你确定你创建了一个 cakesolutions.kafka.KafkaProducer 而不是 org.apache.kafka.clients.producer.KafkaProducer ,其余代码实际上应该保持不变。
或者,您可以利用React式kafka解决这个问题,同时继续使用高级akkahttpdsl。你可以把你的制作人唱片放到Kafka的Flume里,这样做:

val producerSink = Producer.plainSink(producerSettings)

...
        // inside the route
        val producerRecord =
          new ProducerRecord[Array[Byte], String]("topic1", "some message")

        onComplete(Source.single(producerRecord).runWith(producerSink)) { _ =>
          complete(ToResponseMarshallable((StatusCodes.Created, u)))
        }

相关问题