我正在尝试用scala为kafka编写集成测试(对这两个都有点陌生);我的意思是我有一个
ClosedShape RunnableGraph
在我的主代码中,我希望通过一个kafka主题将数据输入到其中,然后检查通过kafka主题得到的结果(而不是对其中的单个流进行单元测试) RunnableGraph
).
下面是一个简化的示例:
import akka.NotUsed
import akka.actor.ActorSystem
import akka.kafka.{ProducerSettings, ConsumerSettings}
import akka.kafka.scaladsl.{Producer, Consumer}
import akka.kafka.scaladsl.Consumer.Control
import akka.stream.ClosedShape
import akka.stream.scaladsl._
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerConfig}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{StringSerializer, StringDeserializer}
import GraphDSL.Implicits._
object SimpleKafkaStream {
def apply(sourceTopic: String, targetTopic: String, kafkaBootstrapServer: String) (implicit actorSystem: ActorSystem) = {
RunnableGraph.fromGraph (GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
source(sourceTopic, kafkaBootstrapServer) ~> transformMessage(targetTopic) ~> target(kafkaBootstrapServer)
ClosedShape
})
}
private def transformMessage (targetTopic: String) = Flow[ConsumerRecord[String, String]]
.map (_.value())
.map ("hello " + _)
.map (message => { new ProducerRecord[String, String] (targetTopic, message) })
private def source (topic: String, bootstrapServer: String) (implicit actorSystem: ActorSystem) : Source[ConsumerRecord[String, String], Control] = {
val consumerSettings = ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer, Set(topic))
.withBootstrapServers(bootstrapServer)
.withGroupId(s"consumer_1_.$topic")
.withClientId(s"consumer_1_.$topic")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
Consumer.plainSource(consumerSettings)
}
private def target (bootstrapServer: String) (implicit actorSystem: ActorSystem) = {
Producer.plainSink(ProducerSettings(actorSystem, new StringSerializer, new StringSerializer)
.withBootstrapServers(bootstrapServer))
}
}
然后用以下方法进行测试:
import java.util.UUID
import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, ProducerSettings}
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.stream.ActorMaterializer
import akka.stream.testkit.javadsl.TestSink
import akka.stream.testkit.scaladsl.TestSource
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.scalatest.{Matchers, WordSpec}
class SimpleKafkaStreamTest extends WordSpec with Matchers {
"A person should be greeted" in new TestScope {
startStream()
send("World")
requestNext() shouldBe "hello World"
}
trait TestScope extends E2EConfiguration with Kafka
trait E2EConfiguration {
implicit val actorSystem = ActorSystem("e2e-system")
implicit val actorMaterializer = ActorMaterializer()
val kafkaBootstrapServer = "192.168.99.100:9092"
val sourceTopic = "person"
val targetTopic = "greeting"
}
trait Kafka {
this: E2EConfiguration =>
private val consumerSettings = ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer, Set(targetTopic))
.withBootstrapServers(kafkaBootstrapServer)
.withGroupId(UUID.randomUUID().toString)
.withClientId(UUID.randomUUID().toString)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
val kafkaInputSource =
TestSource.probe[String].map( name => {
new ProducerRecord[String, String] (sourceTopic, name)
}).to(Producer.plainSink(ProducerSettings(actorSystem, new StringSerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers = kafkaBootstrapServer))).run()
val kafkaOutput = Consumer.plainSource(consumerSettings).runWith(TestSink.probe(actorSystem))
def requestNext() = kafkaOutput.requestNext.value
def send(name: String) = kafkaInputSource.sendNext(name)
def startStream() = {
SimpleKafkaStream(sourceTopic = sourceTopic, targetTopic = targetTopic, kafkaBootstrapServer = kafkaBootstrapServer).run()
}
}
}
所以,这应该写“世界”的主题“人”,并得到回“世界你好”的主题“问候”。。。偶尔也会发生这种情况。然而,大多数时候,我得到:
Expected OnNext(_), yet no element signaled during 3 seconds
java.lang.AssertionError: Expected OnNext(_), yet no element signaled during 3 seconds
at akka.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:268)
at akka.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:259)
at akka.stream.testkit.TestSubscriber$Probe.requestNext(StreamTestKit.scala:631)
at kafka.SimpleKafkaStreamTest$Kafka$class.requestNext(SimpleKafkaStreamTest.scala:56)
at kafka.SimpleKafkaStreamTest$$anonfun$1$$anon$1.requestNext(SimpleKafkaStreamTest.scala:18)
at kafka.SimpleKafkaStreamTest$$anonfun$1$$anon$1.<init>(SimpleKafkaStreamTest.scala:22)
at kafka.SimpleKafkaStreamTest$$anonfun$1.apply$mcV$sp(SimpleKafkaStreamTest.scala:18)
at kafka.SimpleKafkaStreamTest$$anonfun$1.apply(SimpleKafkaStreamTest.scala:18)
at kafka.SimpleKafkaStreamTest$$anonfun$1.apply(SimpleKafkaStreamTest.scala:18)
Kafka根本没有收集到这些数据。我做错什么了?
1条答案
按热度按时间cnh2zyt31#
是的,我是在没有一百万个谎言的情况下自己想出来的。为了让其他遇到同样问题的人受益,以下是需要在上述代码中修复的内容:
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
必须是"latest"
,不是"earliest"
,从队列中获取最新条目。第二,上面的代码没有给actorsystem一个正确关闭并通知kafka两个消费者(一个在测试代码中,一个在测试代码中)现在都死了的机会。否则,队列将保持锁定,直到会话超时时间(默认为30”)过去,并且任何后续的测试运行都将无法读取kafka队列。通过让测试类也扩展来修复
BeforeAndAfterAll
包括在afterAll
方法,Await.result (actorSystem.terminate(), 20.seconds)
(10”不够长)。第三,我发现偏移提交有时会反复失败并立即重新安排,而且这种情况可能会持续24秒(尽管我确信更长的时间是可能的)。这使
kafkaOutput.requestNext()
(kafkaOutput
实际上是一个TestSubscriber.Probe[String]
)不适合目的的;有必要改用kafkaOutput.requestNext(2.seconds)}
(使代码有机会在try
拦网AssertionError
形状的形状"Expected OnNext(_), yet no element signaled
[等]"
以及在上述24“期间内进行足够次数的重试。