Kafka MockConsumer 没有收到记录

Kafka MockConsumer not receiving records

我正在使用 clojure 中的 java 互操作针对 0.9 中的新 kafka 消费者 API 进行写作。到目前为止我已经取得了成功,但现在我正在尝试使用 MockConsumerMockProducer 编写一些单元测试。我的测试总是失败,因为 (first values)nil

我不明白为什么消费者看不到生产者发送到主题的任何消息。

(ns blah
  (:require [cheshire.core :as json]
            [clojure.test :refer [is testing deftest]])
  (:import [org.apache.kafka.clients.consumer MockConsumer OffsetResetStrategy]
           [org.apache.kafka.clients.producer MockProducer]
           [org.apache.kafka.common.serialization StringSerializer]
           [org.apache.kafka.clients.consumer KafkaConsumer ConsumerRecords ConsumerRecord]
           [org.apache.kafka.clients.producer KafkaProducer ProducerRecord]
           [java.util ArrayList]))

(defn send-message
  [producer topic value]
  (let [pr (ProducerRecord. topic value)]
    (.send producer pr)))

(defn messages
  "Return seq of messages from consumer."
  ([consumer] (messages consumer 100))
  ([consumer timeout]
   (println "poll consumer for messages")
   (let [records (seq (.poll consumer timeout))]
     (when records
       (map record->map records)))))

(deftest consuming
  (let [c (MockConsumer. (OffsetResetStrategy/EARLIEST))
        _ (.subscribe c (doto (ArrayList.) (.add "unittest")))
        p (MockProducer. true (StringSerializer.) (StringSerializer.))]
    (send-message p "unittest" (json/generate-string {:a 1 :b "two"}))
    (let [values (seq (messages c))]
      (is (= {:a 1 :b "two"}
          (first values))))
    (.close c)
    (.close p)))

有什么想法吗?

MockProducer 旨在对 class/function 是否正在生成预期消息进行单元测试。例如:

(deftest producing
  (let [p (MockProducer. true (StringSerializer.) (StringSerializer.))]
    (send-message p "unittest" (json/generate-string {:a 1 :b "two"}))
    (let [values (.history p)]
      (is (= {:a 1 :b "two"}
             (json/parse-string (.value (first values)) true))))
    (.close p)))

注意对 the history method

的调用

与 MockConsumer 类似,addRecord method 可以为消费者设置测试用例。

如您所见,MockConsumer 和 MockProducer 是完全不相关的,旨在单独使用。

如果你想测试的是完整的往返,你可能会对使用 this

之类的东西启动嵌入式 Kafka 更感兴趣