kafka消费者不返回任何事件

sauutmhj  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(277)

下面的scala kafka消费者不会返回来自 poll 打电话。
但是,主题是正确的,我可以看到使用控制台使用者将事件发送到主题:

/opt/kafka_2.11-0.10.1.0/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic my_topic --from-beginning

我在下面的scala代码示例中也看到了这个主题,当我使用调试器和调用 kafkaConsumer.listTopics() 而且,这是从单个单元测试调用的,所以我只创建这个trait和consumer的一个示例(即,另一个consumer示例不能使用消息)。我还使用了一个随机组id。
下面的代码/配置有什么问题吗?

import java.util.Properties

import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}

import scala.util.Random

trait KafkaTest {

  val kafkaConsumerProperties = new Properties()

  kafkaConsumerProperties.put("bootstrap.servers", "kafka:9092")

  kafkaConsumerProperties.put("group.id", Random.alphanumeric.take(10).mkString)

  kafkaConsumerProperties.put("key.deserializer", classOf[ByteArrayDeserializer])

  kafkaConsumerProperties.put("value.deserializer", classOf[StringDeserializer])

  val kafkaConsumer = new KafkaConsumer[String, String](kafkaConsumerProperties)

kafkaConsumer.subscribe(java.util.Collections.singletonList("my_topic"))

  def checkKafkaHasReceivedEvent(): Assertion = {

    val kafkaEvents = kafkaConsumer.poll(2000) // Always returns 0 events?
    ...
  }
}

增加轮询超时也无济于事。

wfveoks0

wfveoks01#

要从头开始读取,auto\u offset\u reset\u config属性必须设置为earlish,默认情况下为“latest”

kafkaConsumerProperties.put(
    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
    OffsetResetStrategy.EARLIEST.toString().toLowerCase())

相关问题