关闭kafka连接

vsaztqbk  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(609)

我有一个应用程序,应该发送有限数量的消息Kafka,然后退出。出于某种原因,Kafka的联系即使我关闭了制作人也不会消失。我的实现(在scala中)或多或少

object Kafka {
  private val props = new Properties()

  props.put("compression.codec", DefaultCompressionCodec.codec.toString)
  props.put("producer.type", "sync")
  props.put("metadata.broker.list", "localhost:9092")
  props.put("batch.num.messages", "200")
  props.put("message.send.max.retries", "3")
  props.put("request.required.acks", "-1")
  props.put("client.id", "myclient")

  private val producer = new Producer[Array[Byte], Array[Byte]](new ProducerConfig(props))
  private def encode(msg: Message) = new KeyedMessage("topic", msg.id.getBytes, write(msg).getBytes)

  def send(msg: Message) = Try(producer.send(encode(msg)))
  def close() = producer.close()
}

在这里 Message 是一个简单的case类,如何将其转换为byte数组并不是很重要。
消息确实到了,但当我最终打电话给 Kafka.close() ,应用程序不退出,连接似乎未被释放。
有没有办法明确要求Kafka终止连接?

afdcj2ne

afdcj2ne1#

def close()=生产者.关闭()
这将创建一个名为“close”的函数来调用producer.close()。我看不到任何证据表明您的代码实际上关闭了producer。
你只需要打电话给:producer.close

相关问题