在scala的akka中,集成测试只是断断续续地通过

zf9nrax1  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(442)

我正在尝试用scala为kafka编写集成测试(对这两个都有点陌生);我的意思是我有一个
ClosedShape RunnableGraph 在我的主代码中,我希望通过一个kafka主题将数据输入到其中,然后检查通过kafka主题得到的结果(而不是对其中的单个流进行单元测试) RunnableGraph ).
下面是一个简化的示例:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.kafka.{ProducerSettings, ConsumerSettings}
import akka.kafka.scaladsl.{Producer, Consumer}
import akka.kafka.scaladsl.Consumer.Control
import akka.stream.ClosedShape
import akka.stream.scaladsl._
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerConfig}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{StringSerializer, StringDeserializer}
import GraphDSL.Implicits._

object SimpleKafkaStream {

  def apply(sourceTopic: String, targetTopic: String, kafkaBootstrapServer: String) (implicit actorSystem: ActorSystem) = {

    RunnableGraph.fromGraph (GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
      source(sourceTopic, kafkaBootstrapServer) ~> transformMessage(targetTopic) ~> target(kafkaBootstrapServer)
      ClosedShape
    })
  }

  private def transformMessage (targetTopic: String) = Flow[ConsumerRecord[String, String]]
    .map (_.value())
    .map ("hello " + _)
    .map (message => { new ProducerRecord[String, String] (targetTopic, message) })

  private def source (topic: String, bootstrapServer: String) (implicit actorSystem: ActorSystem) : Source[ConsumerRecord[String, String], Control] = {
    val consumerSettings = ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer, Set(topic))
      .withBootstrapServers(bootstrapServer)
      .withGroupId(s"consumer_1_.$topic")
      .withClientId(s"consumer_1_.$topic")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
      .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
      .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
    Consumer.plainSource(consumerSettings)
  }

  private def target (bootstrapServer: String) (implicit actorSystem: ActorSystem) = {
    Producer.plainSink(ProducerSettings(actorSystem, new StringSerializer, new StringSerializer)
      .withBootstrapServers(bootstrapServer))
  }
}

然后用以下方法进行测试:

import java.util.UUID

import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, ProducerSettings}
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.stream.ActorMaterializer
import akka.stream.testkit.javadsl.TestSink
import akka.stream.testkit.scaladsl.TestSource
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.scalatest.{Matchers, WordSpec}

class SimpleKafkaStreamTest extends WordSpec with Matchers {

  "A person should be greeted" in new TestScope {
    startStream()
    send("World")
    requestNext() shouldBe "hello World"
  }

  trait TestScope extends E2EConfiguration with Kafka

  trait E2EConfiguration {
    implicit val actorSystem = ActorSystem("e2e-system")
    implicit val actorMaterializer = ActorMaterializer()
    val kafkaBootstrapServer = "192.168.99.100:9092"
    val sourceTopic = "person"
    val targetTopic = "greeting"
  }

  trait Kafka {
    this: E2EConfiguration =>

    private val consumerSettings = ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer, Set(targetTopic))
      .withBootstrapServers(kafkaBootstrapServer)
      .withGroupId(UUID.randomUUID().toString)
      .withClientId(UUID.randomUUID().toString)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

    val kafkaInputSource =
      TestSource.probe[String].map( name => {
        new ProducerRecord[String, String] (sourceTopic, name)
    }).to(Producer.plainSink(ProducerSettings(actorSystem, new StringSerializer, new StringSerializer)
      .withBootstrapServers(bootstrapServers = kafkaBootstrapServer))).run()

    val kafkaOutput = Consumer.plainSource(consumerSettings).runWith(TestSink.probe(actorSystem))
    def requestNext() = kafkaOutput.requestNext.value

    def send(name: String) = kafkaInputSource.sendNext(name)

    def startStream() = {
      SimpleKafkaStream(sourceTopic = sourceTopic, targetTopic = targetTopic, kafkaBootstrapServer = kafkaBootstrapServer).run()
    }
  }
}

所以,这应该写“世界”的主题“人”,并得到回“世界你好”的主题“问候”。。。偶尔也会发生这种情况。然而,大多数时候,我得到:

Expected OnNext(_), yet no element signaled during 3 seconds
java.lang.AssertionError: Expected OnNext(_), yet no element signaled during 3 seconds
    at akka.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:268)
    at akka.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:259)
    at akka.stream.testkit.TestSubscriber$Probe.requestNext(StreamTestKit.scala:631)
    at kafka.SimpleKafkaStreamTest$Kafka$class.requestNext(SimpleKafkaStreamTest.scala:56)
    at kafka.SimpleKafkaStreamTest$$anonfun$1$$anon$1.requestNext(SimpleKafkaStreamTest.scala:18)
    at kafka.SimpleKafkaStreamTest$$anonfun$1$$anon$1.<init>(SimpleKafkaStreamTest.scala:22)
    at kafka.SimpleKafkaStreamTest$$anonfun$1.apply$mcV$sp(SimpleKafkaStreamTest.scala:18)
    at kafka.SimpleKafkaStreamTest$$anonfun$1.apply(SimpleKafkaStreamTest.scala:18)
    at kafka.SimpleKafkaStreamTest$$anonfun$1.apply(SimpleKafkaStreamTest.scala:18)

Kafka根本没有收集到这些数据。我做错什么了?

cnh2zyt3

cnh2zyt31#

是的,我是在没有一百万个谎言的情况下自己想出来的。为了让其他遇到同样问题的人受益,以下是需要在上述代码中修复的内容: ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 必须是 "latest" ,不是 "earliest" ,从队列中获取最新条目。
第二,上面的代码没有给actorsystem一个正确关闭并通知kafka两个消费者(一个在测试代码中,一个在测试代码中)现在都死了的机会。否则,队列将保持锁定,直到会话超时时间(默认为30”)过去,并且任何后续的测试运行都将无法读取kafka队列。通过让测试类也扩展来修复 BeforeAndAfterAll 包括在 afterAll 方法, Await.result (actorSystem.terminate(), 20.seconds) (10”不够长)。
第三,我发现偏移提交有时会反复失败并立即重新安排,而且这种情况可能会持续24秒(尽管我确信更长的时间是可能的)。这使 kafkaOutput.requestNext() ( kafkaOutput 实际上是一个 TestSubscriber.Probe[String] )不适合目的的;有必要改用 kafkaOutput.requestNext(2.seconds)} (使代码有机会在 try 拦网 AssertionError 形状的形状 "Expected OnNext(_), yet no element signaled [等] " 以及在上述24“期间内进行足够次数的重试。

相关问题