fs2 kafka使用者获取类型不匹配错误发现fs2.stream required cats.effect.io

wz1wpwve  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(236)

我一直在学习fs2Kafka的例子。不过,对于一个消费者来说,我还是坚持使用这个例子。我遇到的问题是fs2.stream和cats.effect.io之间的类型不匹配(错误如下)
代码:nb:现在更新了@alexeynovakov的建议,提供了一个工作示例

package pb.streams

import cats.effect.{ContextShift, Timer}

import fs2.kafka._
import fs2.kafka.{AutoOffsetReset, ConsumerSettings}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization._

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.concurrent.forkjoin.ForkJoinPool
import scala.concurrent.duration._

import cats.implicits._
import cats.effect.IO

object Consumer {

  implicit val ec: ExecutionContextExecutor =
    ExecutionContext.fromExecutor(new ForkJoinPool(4))

  implicit val contextShift: ContextShift[IO] = IO.contextShift(ec)
  implicit val timer: Timer[IO] = IO.timer(ec)

  def main(args: Array[String]): Unit = {
    consumeFeed()
    ()
  }

  def processRecord(record: ConsumerRecord[String, String]): IO[Unit] = {
    println(s"${record.key()} => ${record.value()}")

    IO.unit
  }

  def consumeFeed()= {

    val consumerSettings = (executionContext: ExecutionContext) ⇒
      ConsumerSettings(
        keyDeserializer   = new StringDeserializer,
        valueDeserializer = new StringDeserializer,
        executionContext  = executionContext
      )
        .withAutoOffsetReset(AutoOffsetReset.Earliest)
        .withBootstrapServers("localhost:9092")
        .withPollTimeout(250.milliseconds)
        .withGroupId("group")

      for {
        executionContext    ← consumerExecutionContextStream[IO]
        consumer            ← consumerStream[IO].using(consumerSettings(executionContext))
        _                   ← fs2.Stream.eval(consumer.subscribeTo("topic-inbox"))
        _                   ← consumer.stream
                              .mapAsync( 4) { message ⇒
                                processRecord(message.record)
                                  .as(message.committableOffset)
                              }
                              .groupWithin(500, 15.seconds)
                              .map(_.foldLeft(CommittableOffsetBatch.empty[IO])(_ updated _))
                              .evalMap(_.commit)
      } yield ()

  }
}

我似乎在编译时遇到的错误是:

Error:(55, 29) type mismatch;
 found   : fs2.Stream[[x]cats.effect.IO[x],Unit]
 required: cats.effect.IO[?]
        _                   ← consumer.stream

Error:(54, 29) type mismatch;
 found   : cats.effect.IO[Nothing]
 required: fs2.Stream[?,?]
        _                   ← consumer.subscribeTo("topic-inbox")

Error:(55, 9) parameter value consumer in value $anonfun is never used
        consumer            ← consumerStream[IO].using(consumerSettings(executionContext))

有谁能提供一些见解来帮助我理解和修复这个神秘的错误吗?我试过各种方法来解决这个问题,但都没有用。任何帮助将不胜感激,因为我似乎无法谷歌类似的情况。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题