我正在尝试使用嵌入式kafka集群对数据流运行一些集成测试。在与本地环境不同的环境中执行所有测试时,由于未正确删除某些内部状态,测试将失败。
当我在每个测试之前/之后启动/停止kafka集群时,我可以在非本地环境中运行所有测试,但我只想在执行测试套件的开始和结束时启动和停止集群一次。
我试图删除本地流状态,但似乎不起作用:
override protected def afterEach(): Unit = KStreamTestUtils.purgeLocalStreamsState(properties)
有没有一种方法可以让我的测试运行而不必每次启动/停止集群?
下面是相关的课程。
class TweetStreamProcessorSpec extends FeatureSpec
with MockFactory with GivenWhenThen with Eventually with BeforeAndAfterEach with BeforeAndAfterAll {
val CLUSTER: EmbeddedKafkaCluster = new EmbeddedKafkaCluster
val TEST_TOPIC: String = "test_topic"
val properties = new Properties()
override def beforeAll(): Unit = {
CLUSTER.start()
CLUSTER.createTopic(TEST_TOPIC, 1, 1)
}
override def afterAll(): Unit = CLUSTER.stop()
// if uncommenting these lines tests works
// override def afterEach(): Unit = CLUSTER.stop()
// override protected def beforeEach(): Unit = CLUSTER.start()
def createProducer: KafkaProducer[String, TweetEvent] = {
val properties = Map(
KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
VALUE_SERIALIZER_CLASS_CONFIG -> classOf[ReflectAvroSerializer[TweetEvent]].getName,
BOOTSTRAP_SERVERS_CONFIG -> CLUSTER.bootstrapServers(),
SCHEMA_REGISTRY_URL_CONFIG -> CLUSTER.schemaRegistryUrlForcedToLocalhost()
)
new KafkaProducer[String, TweetEvent](properties)
}
def kafkaConsumerSettings: KafkaConfig = {
val bootstrapServers = CLUSTER.bootstrapServers()
val schemaRegistryUrl = CLUSTER.schemaRegistryUrlForcedToLocalhost()
val zookeeper = CLUSTER.zookeeperConnect()
KafkaConfig(
ConfigFactory.parseString(
s"""
akka.kafka.bootstrap.servers = "$bootstrapServers"
akka.kafka.schema.registry.url = "$schemaRegistryUrl"
akka.kafka.zookeeper.servers = "$zookeeper"
akka.kafka.topic-name = "$TEST_TOPIC"
akka.kafka.consumer.kafka-clients.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
akka.kafka.consumer.kafka-clients.value.deserializer = ${classOf[ReflectAvroDeserializer[TweetEvent]].getName}
akka.kafka.consumer.kafka-clients.client.id = client1
akka.kafka.consumer.wakeup-timeout=20s
akka.kafka.consumer.max-wakeups=10
""").withFallback(ConfigFactory.load()).getConfig("akka.kafka")
)
}
feature("Logging tweet data from kafka topic") {
scenario("log id and payload when consuming a update tweet event") {
publishEventsToKafka(List(upTweetEvent))
val logger = Mockito.mock(classOf[Logger])
val pipeline = new TweetStreamProcessor(kafkaConsumerSettings, logger)
pipeline.start
eventually(timeout(Span(5, Seconds))) {
Mockito.verify(logger, Mockito.times(1)).info(s"updating tweet uuid=${upTweetEvent.getUuid}, payload=${upTweetEvent.getPayload}")
}
pipeline.stop
}
scenario("log id when consuming a delete tweet event") {
publishEventsToKafka(List(delTweetEvent))
val logger = Mockito.mock(classOf[Logger])
val pipeline = new TweetStreamProcessor(kafkaConsumerSettings, logger)
pipeline.start
eventually(timeout(Span(5, Seconds))) {
Mockito.verify(logger, Mockito.times(1)).info(s"deleting tweet uuid=${delTweetEvent.getUuid}")
}
pipeline.stop
}
}
}
class TweetStreamProcessor(kafkaConfig: KafkaConfig, logger: Logger)
extends Lifecycle with TweetStreamProcessor with Logging {
private var control: Control = _
private val valueDeserializer: Option[Deserializer[TweetEvent]] = None
// ...
def tweetsSource(implicit mat: Materializer): Source[CommittableMessage[String, TweetEvent], Control] =
Consumer.committableSource(tweetConsumerSettings, Subscriptions.topics(kafkaConfig.topicName))
override def start: Future[Unit] = {
control = tweetsSource(materializer)
.mapAsync(1) { msg =>
logTweetEvent(msg.record.value())
.map(_ => msg.committableOffset)
}.batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) =>
batch.updated(elem)
}
.mapAsync(3)(_.commitScaladsl())
.to(Sink.ignore)
.run()
Future.successful()
}
override def stop: Future[Unit] = {
control.shutdown()
.map(_ => Unit)
}
}
如果能帮上忙,我会非常感激的?提前谢谢。
暂无答案!
目前还没有任何答案,快来回答吧!