如果每次不启动/停止kafka,则kafka测试会间歇性失败

irlmq6kh  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(230)

我正在尝试使用嵌入式kafka集群对数据流运行一些集成测试。在与本地环境不同的环境中执行所有测试时,由于未正确删除某些内部状态,测试将失败。
当我在每个测试之前/之后启动/停止kafka集群时,我可以在非本地环境中运行所有测试,但我只想在执行测试套件的开始和结束时启动和停止集群一次。
我试图删除本地流状态,但似乎不起作用:

override protected def afterEach(): Unit = KStreamTestUtils.purgeLocalStreamsState(properties)

有没有一种方法可以让我的测试运行而不必每次启动/停止集群?
下面是相关的课程。

class TweetStreamProcessorSpec extends FeatureSpec
  with MockFactory with GivenWhenThen with Eventually with BeforeAndAfterEach with BeforeAndAfterAll {

  val CLUSTER: EmbeddedKafkaCluster = new EmbeddedKafkaCluster
  val TEST_TOPIC: String = "test_topic"
  val properties = new Properties()

  override def beforeAll(): Unit = {
    CLUSTER.start()
    CLUSTER.createTopic(TEST_TOPIC, 1, 1)
  }

  override def afterAll(): Unit = CLUSTER.stop()

  // if uncommenting these lines tests works
  // override def afterEach(): Unit = CLUSTER.stop() 
  // override protected def beforeEach(): Unit = CLUSTER.start()

  def createProducer: KafkaProducer[String, TweetEvent] = {
    val properties = Map(
      KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
      VALUE_SERIALIZER_CLASS_CONFIG -> classOf[ReflectAvroSerializer[TweetEvent]].getName,
      BOOTSTRAP_SERVERS_CONFIG -> CLUSTER.bootstrapServers(),
      SCHEMA_REGISTRY_URL_CONFIG -> CLUSTER.schemaRegistryUrlForcedToLocalhost()
    )
    new KafkaProducer[String, TweetEvent](properties)
  }

  def kafkaConsumerSettings: KafkaConfig = {
    val bootstrapServers = CLUSTER.bootstrapServers()
    val schemaRegistryUrl = CLUSTER.schemaRegistryUrlForcedToLocalhost()
    val zookeeper = CLUSTER.zookeeperConnect()

    KafkaConfig(
      ConfigFactory.parseString(
        s"""
        akka.kafka.bootstrap.servers = "$bootstrapServers"
        akka.kafka.schema.registry.url = "$schemaRegistryUrl"
        akka.kafka.zookeeper.servers = "$zookeeper"
        akka.kafka.topic-name = "$TEST_TOPIC"
        akka.kafka.consumer.kafka-clients.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
        akka.kafka.consumer.kafka-clients.value.deserializer = ${classOf[ReflectAvroDeserializer[TweetEvent]].getName}
        akka.kafka.consumer.kafka-clients.client.id = client1
        akka.kafka.consumer.wakeup-timeout=20s
        akka.kafka.consumer.max-wakeups=10
      """).withFallback(ConfigFactory.load()).getConfig("akka.kafka")
    )
  }

  feature("Logging tweet data from kafka topic") {

    scenario("log id and payload when consuming a update tweet event") {
      publishEventsToKafka(List(upTweetEvent))
      val logger = Mockito.mock(classOf[Logger])
      val pipeline = new TweetStreamProcessor(kafkaConsumerSettings, logger)
      pipeline.start
      eventually(timeout(Span(5, Seconds))) {
        Mockito.verify(logger, Mockito.times(1)).info(s"updating tweet uuid=${upTweetEvent.getUuid}, payload=${upTweetEvent.getPayload}")
      }
      pipeline.stop
    }

    scenario("log id when consuming a delete tweet event") {
      publishEventsToKafka(List(delTweetEvent))
      val logger = Mockito.mock(classOf[Logger])
      val pipeline = new TweetStreamProcessor(kafkaConsumerSettings, logger)
      pipeline.start
      eventually(timeout(Span(5, Seconds))) {
        Mockito.verify(logger, Mockito.times(1)).info(s"deleting tweet uuid=${delTweetEvent.getUuid}")
      }
      pipeline.stop
    }
  }
}

class TweetStreamProcessor(kafkaConfig: KafkaConfig, logger: Logger)
  extends Lifecycle with TweetStreamProcessor with Logging {

  private var control: Control = _
  private val valueDeserializer: Option[Deserializer[TweetEvent]] = None

  // ...

  def tweetsSource(implicit mat: Materializer): Source[CommittableMessage[String, TweetEvent], Control] =
    Consumer.committableSource(tweetConsumerSettings, Subscriptions.topics(kafkaConfig.topicName))

  override def start: Future[Unit] = {
    control = tweetsSource(materializer)
      .mapAsync(1) { msg =>
        logTweetEvent(msg.record.value())
          .map(_ => msg.committableOffset)
      }.batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) =>
      batch.updated(elem)
    }
      .mapAsync(3)(_.commitScaladsl())
      .to(Sink.ignore)
      .run()

    Future.successful()
  }

  override def stop: Future[Unit] = {
    control.shutdown()
      .map(_ => Unit)
  }
}

如果能帮上忙,我会非常感激的?提前谢谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题