我是针对0.9中使用clojure中的javainterop的新kafka消费api编写的。到目前为止,我已经取得了成功,但现在我正在尝试用它编写一些单元测试 MockConsumer
以及 MockProducer
. 我的考试总是不及格,因为 (first values)
是 nil
.
我不明白为什么消费者看不到制作人发送给主题的任何信息。
(ns blah
(:require [cheshire.core :as json]
[clojure.test :refer [is testing deftest]])
(:import [org.apache.kafka.clients.consumer MockConsumer OffsetResetStrategy]
[org.apache.kafka.clients.producer MockProducer]
[org.apache.kafka.common.serialization StringSerializer]
[org.apache.kafka.clients.consumer KafkaConsumer ConsumerRecords ConsumerRecord]
[org.apache.kafka.clients.producer KafkaProducer ProducerRecord]
[java.util ArrayList]))
(defn send-message
[producer topic value]
(let [pr (ProducerRecord. topic value)]
(.send producer pr)))
(defn messages
"Return seq of messages from consumer."
([consumer] (messages consumer 100))
([consumer timeout]
(println "poll consumer for messages")
(let [records (seq (.poll consumer timeout))]
(when records
(map record->map records)))))
(deftest consuming
(let [c (MockConsumer. (OffsetResetStrategy/EARLIEST))
_ (.subscribe c (doto (ArrayList.) (.add "unittest")))
p (MockProducer. true (StringSerializer.) (StringSerializer.))]
(send-message p "unittest" (json/generate-string {:a 1 :b "two"}))
(let [values (seq (messages c))]
(is (= {:a 1 :b "two"}
(first values))))
(.close c)
(.close p)))
有什么想法吗?
1条答案
按热度按时间laik7k3q1#
mockproducer设计用于单元测试类/函数是否生成预期的消息。例如:
注意对history方法的调用
与此类似,mockconsumer使用addrecord方法为使用者设置测试用例。
正如您所看到的,mockconsumer和mockproducer是完全不相关的,设计为单独使用。
如果您想要测试的是一个完整的往返过程,那么您可能会对使用这样的东西启动一个嵌入式Kafka更感兴趣