Kafka MockConsumer 没有收到记录
Kafka MockConsumer not receiving records
我正在使用 clojure 中的 java 互操作针对 0.9 中的新 kafka 消费者 API 进行写作。到目前为止我已经取得了成功,但现在我正在尝试使用 MockConsumer
和 MockProducer
编写一些单元测试。我的测试总是失败,因为 (first values)
是 nil
。
我不明白为什么消费者看不到生产者发送到主题的任何消息。
(ns blah
(:require [cheshire.core :as json]
[clojure.test :refer [is testing deftest]])
(:import [org.apache.kafka.clients.consumer MockConsumer OffsetResetStrategy]
[org.apache.kafka.clients.producer MockProducer]
[org.apache.kafka.common.serialization StringSerializer]
[org.apache.kafka.clients.consumer KafkaConsumer ConsumerRecords ConsumerRecord]
[org.apache.kafka.clients.producer KafkaProducer ProducerRecord]
[java.util ArrayList]))
(defn send-message
[producer topic value]
(let [pr (ProducerRecord. topic value)]
(.send producer pr)))
(defn messages
"Return seq of messages from consumer."
([consumer] (messages consumer 100))
([consumer timeout]
(println "poll consumer for messages")
(let [records (seq (.poll consumer timeout))]
(when records
(map record->map records)))))
(deftest consuming
(let [c (MockConsumer. (OffsetResetStrategy/EARLIEST))
_ (.subscribe c (doto (ArrayList.) (.add "unittest")))
p (MockProducer. true (StringSerializer.) (StringSerializer.))]
(send-message p "unittest" (json/generate-string {:a 1 :b "two"}))
(let [values (seq (messages c))]
(is (= {:a 1 :b "two"}
(first values))))
(.close c)
(.close p)))
有什么想法吗?
MockProducer 旨在对 class/function 是否正在生成预期消息进行单元测试。例如:
(deftest producing
(let [p (MockProducer. true (StringSerializer.) (StringSerializer.))]
(send-message p "unittest" (json/generate-string {:a 1 :b "two"}))
(let [values (.history p)]
(is (= {:a 1 :b "two"}
(json/parse-string (.value (first values)) true))))
(.close p)))
的调用
与 MockConsumer 类似,addRecord method 可以为消费者设置测试用例。
如您所见,MockConsumer 和 MockProducer 是完全不相关的,旨在单独使用。
如果你想测试的是完整的往返,你可能会对使用 this
之类的东西启动嵌入式 Kafka 更感兴趣
我正在使用 clojure 中的 java 互操作针对 0.9 中的新 kafka 消费者 API 进行写作。到目前为止我已经取得了成功,但现在我正在尝试使用 MockConsumer
和 MockProducer
编写一些单元测试。我的测试总是失败,因为 (first values)
是 nil
。
我不明白为什么消费者看不到生产者发送到主题的任何消息。
(ns blah
(:require [cheshire.core :as json]
[clojure.test :refer [is testing deftest]])
(:import [org.apache.kafka.clients.consumer MockConsumer OffsetResetStrategy]
[org.apache.kafka.clients.producer MockProducer]
[org.apache.kafka.common.serialization StringSerializer]
[org.apache.kafka.clients.consumer KafkaConsumer ConsumerRecords ConsumerRecord]
[org.apache.kafka.clients.producer KafkaProducer ProducerRecord]
[java.util ArrayList]))
(defn send-message
[producer topic value]
(let [pr (ProducerRecord. topic value)]
(.send producer pr)))
(defn messages
"Return seq of messages from consumer."
([consumer] (messages consumer 100))
([consumer timeout]
(println "poll consumer for messages")
(let [records (seq (.poll consumer timeout))]
(when records
(map record->map records)))))
(deftest consuming
(let [c (MockConsumer. (OffsetResetStrategy/EARLIEST))
_ (.subscribe c (doto (ArrayList.) (.add "unittest")))
p (MockProducer. true (StringSerializer.) (StringSerializer.))]
(send-message p "unittest" (json/generate-string {:a 1 :b "two"}))
(let [values (seq (messages c))]
(is (= {:a 1 :b "two"}
(first values))))
(.close c)
(.close p)))
有什么想法吗?
MockProducer 旨在对 class/function 是否正在生成预期消息进行单元测试。例如:
(deftest producing
(let [p (MockProducer. true (StringSerializer.) (StringSerializer.))]
(send-message p "unittest" (json/generate-string {:a 1 :b "two"}))
(let [values (.history p)]
(is (= {:a 1 :b "two"}
(json/parse-string (.value (first values)) true))))
(.close p)))
的调用
与 MockConsumer 类似,addRecord method 可以为消费者设置测试用例。
如您所见,MockConsumer 和 MockProducer 是完全不相关的,旨在单独使用。
如果你想测试的是完整的往返,你可能会对使用 this
之类的东西启动嵌入式 Kafka 更感兴趣