单元测试的kafka主题

jjhzyzn0  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(318)

我需要在kafka应用程序上执行单元测试,避免使用第三方库。
我现在的问题是,我想在测试之间清除所有的主题,但我不知道怎么做。
这是我的临时解决方案:提交每个测试后生成的每条消息,并将所有测试使用者放在同一个使用者组中。

override protected def afterEach():Unit={
    val cleanerConsumer= newConsumer(Seq.empty)
    val topics=cleanerConsumer.listTopics()
    println("pulisco")
    cleanerConsumer.subscribe(topics.keySet())
    cleanerConsumer.poll(100)
    cleanerConsumer.commitSync()
    cleanerConsumer.close()
}

但这不管用,我也不知道为什么。
例如,当我在测试中创建一个新的使用者时, messages 包含在上一个测试中生成的消息。

val consumerProbe = newConsumer(SMSGatewayTopic)

val messages = consumerProbe.poll(1000)

我怎样才能解决这个问题?

bz4sfanl

bz4sfanl1#

您还可以在测试源中嵌入kafka/zookeeper示例,以便在这些隔离服务上拥有更多控制器。

trait Kafka { self: ZooKeeper =>
  Kafka.start()
}

object Kafka {
  import org.apache.hadoop.fs.FileUtil
  import kafka.server.KafkaServer

  @volatile private var started = false

  lazy val logDir = java.nio.file.Files.createTempDirectory("kafka-log").toFile

  lazy val kafkaServer: KafkaServer = {
    val config = com.typesafe.config.ConfigFactory.
      load(this.getClass.getClassLoader)

    val (host, port) = {
      val (h, p) = config.getString("kafka.servers").span(_ != ':')
      h -> p.drop(1).toInt
    }

    val serverConf = new kafka.server.KafkaConfig({
      val props = new java.util.Properties()
      props.put("port", port.toString)
      props.put("broker.id", port.toString)
      props.put("log.dir", logDir.getAbsolutePath)

      props.put(
        "zookeeper.connect",
        s"localhost:${config getInt "test.zookeeper.port"}"
      )

      props
    })

    new KafkaServer(serverConf)
  }

  def start(): Unit = if (!started) {
    try {
      kafkaServer.startup()
      started = true
    } catch {
      case err: Throwable =>
        println(s"fails to start Kafka: ${err.getMessage}")
        throw err
    }
  }

  def stop(): Unit = try {
    if (started) kafkaServer.shutdown()
  } finally {
    FileUtil.fullyDelete(logDir)
  }
}

trait ZooKeeper {
  ZooKeeper.start()
}

object ZooKeeper {
  import java.nio.file.Files
  import java.net.InetSocketAddress
  import org.apache.hadoop.fs.FileUtil
  import org.apache.zookeeper.server.ZooKeeperServer
  import org.apache.zookeeper.server.ServerCnxnFactory

  @volatile private var started = false
  lazy val logDir = Files.createTempDirectory("zk-log").toFile
  lazy val snapshotDir = Files.createTempDirectory("zk-snapshots").toFile

  lazy val (zkServer, zkFactory) = {
    val srv = new ZooKeeperServer(
      snapshotDir, logDir, 500
    )

    val config = com.typesafe.config.ConfigFactory.
      load(this.getClass.getClassLoader)
    val port = config.getInt("test.zookeeper.port")

    srv -> ServerCnxnFactory.createFactory(
      new InetSocketAddress("localhost", port), 1024
    )
  }

  def start(): Unit = if (!zkServer.isRunning) {
    try {
      zkFactory.startup(zkServer)

      started = true

      while (!zkServer.isRunning) {
        Thread.sleep(500)
      }
    } catch {
      case err: Throwable =>
        println(s"fails to start ZooKeeper: ${err.getMessage}")
        throw err
    }
  }

  def stop(): Unit = try {
    if (started) zkFactory.shutdown()
  } finally {
    try { FileUtil.fullyDelete(logDir) } catch { case _: Throwable => () }
    FileUtil.fullyDelete(snapshotDir)
  }
}

测试类可以 extends Kafka with ZooKeeper 以确保这一点。
如果测试jvm没有分叉, Tests.Cleanup 在sbt中 testOptions in Test 设置可用于在测试后停止嵌入式服务。

js4nwp54

js4nwp542#

我建议,你只需在测试前重新创建所有主题。例如,这是kafka测试创建/删除主题的方式:
github上的kafka存储库

相关问题