kafka测试间歇性失败/成功

gtlvzcf8  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(416)

我正在尝试为kafka0.8.2中的kafkascala客户机的抽象编写一个简单的测试。它基本上只是给Kafka写一条信息,然后我试着把它读回来。但是,我遇到了间歇性失败的问题,所以我将测试代码归结为下面的代码。这个测试有时(很少)通过,有时失败。我在干什么?

package mykafkatest

import java.net.ServerSocket
import java.nio.file.Files
import java.util.{UUID, Properties}

import kafka.consumer.{Whitelist, ConsumerConfig, Consumer}
import kafka.producer.{ProducerConfig, Producer, KeyedMessage}
import kafka.serializer.StringDecoder
import kafka.server.KafkaConfig
import kafka.server.KafkaServerStartable
import org.apache.curator.test.TestingServer

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._

class KafkaSenderTest extends org.scalatest.FunSpecLike with org.scalatest.ShouldMatchers with org.scalatest.BeforeAndAfterAll {

  import scala.concurrent.ExecutionContext.Implicits.global
  val zkServer = new TestingServer()

  val socket = new ServerSocket(0)
  val port = socket.getLocalPort.toString
  socket.close()
  val tmpDir = Files.createTempDirectory("kafka-test-logs")

  val serverProps = new Properties
  serverProps.put("broker.id", port)
  serverProps.put("log.dirs", tmpDir.toAbsolutePath.toString)
  serverProps.put("host.name", "localhost")
  serverProps.put("zookeeper.connect", zkServer.getConnectString)
  serverProps.put("port", port)

  val config = new KafkaConfig(serverProps)
  val kafkaServer = new KafkaServerStartable(config)

  override def beforeAll ={
    kafkaServer.startup()
  }

  override def afterAll = {
    kafkaServer.shutdown()
  }

  it("should put messages on a kafka queue") {
    println("zkServer: " + zkServer.getConnectString)
    println("broker port: " + port)

    val consumerProps = new Properties()
    consumerProps.put("group.id", UUID.randomUUID().toString)
    consumerProps.put("zookeeper.connect", zkServer.getConnectString)

    val consumerConnector = Consumer.create(new ConsumerConfig(consumerProps))
    val topic = "some-topic"
    val filterSpec = new Whitelist(topic)
    val stream = consumerConnector.createMessageStreamsByFilter(filterSpec, 1, new StringDecoder, new StringDecoder).head

    val producerProps = new Properties()
    producerProps.put("metadata.broker.list","localhost:"+port)

    val sender = new Producer[Array[Byte], Array[Byte]](new ProducerConfig(producerProps))
    val keyedMessage = new KeyedMessage[Array[Byte], Array[Byte]](topic, "awesome message".getBytes("UTF-8"))
    sender.send(keyedMessage)

    val msg = Await.result(Future { stream.take(1) }, 5 seconds)
    msg.headOption should not be(empty)

  }
}

编辑:我已经创建了一个新项目,其中包含以下build.sbt和上面的代码作为测试类。

name := "mykafkatest"

version := "1.0"

scalaVersion := "2.11.5"

libraryDependencies ++= Seq(
  "org.apache.kafka" %% "kafka" % "0.8.2.0",

  "org.scalatest" %% "scalatest" % "2.2.2" % "test",
  "org.apache.curator" % "curator-test" % "2.7.0" % "test"
)

测试似乎更经常通过,但仍然断断续续地失败。。。

2izufjch

2izufjch1#

我认为这是某种信息缓冲问题。如果您发送200条信息,这对我有效:

(1 to 200).foreach(i => sender.send(keyedMessage))

199条消息失败。我试图改变配置,但找不到任何魔术使1消息的工作,虽然我相信有一些配置,可以使这项工作。

jljoyd4f

jljoyd4f2#

您可能有一个争用条件,导致使用者在消息发送后实际完成初始化,然后忽略消息,因为默认情况下消息以最大偏移量开始。
尝试添加

consumerProps.put("auto.offset.reset", "smallest")

你的消费者财产

相关问题