clj-kafka - 消费者为空
clj-kafka - consumer empty
我正在尝试 usgin 库 clj-kafka
。
这是我的代码
(use [clj-kafka.producer]
[clj-kafka.zk]
[clj-kafka.consumer.zk]
[clj-kafka.core]))
(brokers {"zookeeper.connect" "localhost:2181"})
(def p (producer {"metadata.broker.list" "localhost:9092"
"serializer.class" "kafka.serializer.DefaultEncoder"
"partitioner.class" "kafka.producer.DefaultPartitioner"}))
(send-message p (message "test" (.getBytes "this is my message")))
(def config {"zookeeper.connect" "localhost:2181"
"group.id" "clj-kafka.consumer"
"auto.offset.reset" "smallest"
"auto.commit.enable" "false"})
(with-resource [c (consumer config)]
shutdown
(take 2 (messages c "test"))) ;; return ()
我用
启动 zookepper-server 和 kafka 本身
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
config/zookepper.properties:
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
和config/server.properties:
broker.id=0
listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
zocket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
'problem'是我执行的时候:
(with-resource [c (consumer config)]
shutdown
(take 2 (messages c "test"))) ;; return empty ()
有什么想法吗?
提前致谢
看到这个 github issue。似乎文档不是很好。您必须使用 doall
强制实现序列(这是惰性的)。试试这个:
(with-resource [c (consumer config)]
shutdown
(doall (take 2 (messages c "test"))))
我正在尝试 usgin 库 clj-kafka
。
这是我的代码
(use [clj-kafka.producer]
[clj-kafka.zk]
[clj-kafka.consumer.zk]
[clj-kafka.core]))
(brokers {"zookeeper.connect" "localhost:2181"})
(def p (producer {"metadata.broker.list" "localhost:9092"
"serializer.class" "kafka.serializer.DefaultEncoder"
"partitioner.class" "kafka.producer.DefaultPartitioner"}))
(send-message p (message "test" (.getBytes "this is my message")))
(def config {"zookeeper.connect" "localhost:2181"
"group.id" "clj-kafka.consumer"
"auto.offset.reset" "smallest"
"auto.commit.enable" "false"})
(with-resource [c (consumer config)]
shutdown
(take 2 (messages c "test"))) ;; return ()
我用
启动 zookepper-server 和 kafka 本身bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
config/zookepper.properties:
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
和config/server.properties:
broker.id=0
listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
zocket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
'problem'是我执行的时候:
(with-resource [c (consumer config)]
shutdown
(take 2 (messages c "test"))) ;; return empty ()
有什么想法吗?
提前致谢
看到这个 github issue。似乎文档不是很好。您必须使用 doall
强制实现序列(这是惰性的)。试试这个:
(with-resource [c (consumer config)]
shutdown
(doall (take 2 (messages c "test"))))