kafka未收到记录

envsm3lx  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(433)

我是针对0.9中使用clojure中的javainterop的新kafka消费api编写的。到目前为止,我已经取得了成功,但现在我正在尝试用它编写一些单元测试 MockConsumer 以及 MockProducer . 我的考试总是不及格,因为 (first values)nil .
我不明白为什么消费者看不到制作人发送给主题的任何信息。

(ns blah
  (:require [cheshire.core :as json]
            [clojure.test :refer [is testing deftest]])
  (:import [org.apache.kafka.clients.consumer MockConsumer OffsetResetStrategy]
           [org.apache.kafka.clients.producer MockProducer]
           [org.apache.kafka.common.serialization StringSerializer]
           [org.apache.kafka.clients.consumer KafkaConsumer ConsumerRecords ConsumerRecord]
           [org.apache.kafka.clients.producer KafkaProducer ProducerRecord]
           [java.util ArrayList]))

(defn send-message
  [producer topic value]
  (let [pr (ProducerRecord. topic value)]
    (.send producer pr)))

(defn messages
  "Return seq of messages from consumer."
  ([consumer] (messages consumer 100))
  ([consumer timeout]
   (println "poll consumer for messages")
   (let [records (seq (.poll consumer timeout))]
     (when records
       (map record->map records)))))

(deftest consuming
  (let [c (MockConsumer. (OffsetResetStrategy/EARLIEST))
        _ (.subscribe c (doto (ArrayList.) (.add "unittest")))
        p (MockProducer. true (StringSerializer.) (StringSerializer.))]
    (send-message p "unittest" (json/generate-string {:a 1 :b "two"}))
    (let [values (seq (messages c))]
      (is (= {:a 1 :b "two"}
          (first values))))
    (.close c)
    (.close p)))

有什么想法吗?

laik7k3q

laik7k3q1#

mockproducer设计用于单元测试类/函数是否生成预期的消息。例如:

(deftest producing
  (let [p (MockProducer. true (StringSerializer.) (StringSerializer.))]
    (send-message p "unittest" (json/generate-string {:a 1 :b "two"}))
    (let [values (.history p)]
      (is (= {:a 1 :b "two"}
             (json/parse-string (.value (first values)) true))))
    (.close p)))

注意对history方法的调用
与此类似,mockconsumer使用addrecord方法为使用者设置测试用例。
正如您所看到的,mockconsumer和mockproducer是完全不相关的,设计为单独使用。
如果您想要测试的是一个完整的往返过程,那么您可能会对使用这样的东西启动一个嵌入式Kafka更感兴趣

相关问题