没有来自 Kafka 消费者的数据 Python - 消费者一直在听,但什么也没有出来
No data from the Kafka Consumer Python - Consumer keeps listening but nothing comes out
我正在寻找一种使用 kafka 将我的 API(本地主机)显示到我的 docker 的方法。
我的制作人(下图)工作起来很有魅力。我知道,因为当我打印 res.text 时,我有一个输出。
import json
import requests
from kafka import KafkaProducer
import time
# get data
res = requests.get('http://127.0.0.1:5000/twitter')
#print(res.text)
# use kafka
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])#, api_version='2.0.0')
producer.send('test', json.dumps(res.text).encode('utf-8'))
time.sleep(1)
#producer.flush()
但是,我的消费者不起作用。到目前为止,这是我尝试过的。
当前停在 for 循环。
import kafka
import json
import requests
from kafka import KafkaConsumer
# utiliser kafka
consumer = KafkaConsumer('test', bootstrap_servers=['localhost:9092'], api_version='2.0.0', group_id="test_id", value_deserializer = json.loads)
print('before for ')
consumer.subscribe('test')
for msg in consumer:
print('IN for')
#print(type(consumer))
print(json.loads(msg.value.decode()))
#print(consumer)
我在某处遗漏了一些东西,但我不知道是什么。
当我手动停止时,我从 docker 收到以下错误:
<class 'kafka.consumer.group.KafkaConsumer'>
^CTraceback (most recent call last):
File "consumer.py", line 11, in <module>
for m in consumer:
File "/usr/lib/python3.7/site-packages/kafka/consumer/group.py", line 1193, in __next__
return self.next_v2()
File "/usr/lib/python3.7/site-packages/kafka/consumer/group.py", line 1201, in next_v2
return next(self._iterator)
File "/usr/lib/python3.7/site-packages/kafka/consumer/group.py", line 1116, in _message_generator_v2
record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
File "/usr/lib/python3.7/site-packages/kafka/consumer/group.py", line 655, in poll
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
File "/usr/lib/python3.7/site-packages/kafka/consumer/group.py", line 680, in _poll_once
self._update_fetch_positions(self._subscription.missing_fetch_positions())
File "/usr/lib/python3.7/site-packages/kafka/consumer/group.py", line 1112, in _update_fetch_positions
self._fetcher.update_fetch_positions(partitions)
File "/usr/lib/python3.7/site-packages/kafka/consumer/fetcher.py", line 186, in update_fetch_positions
self._reset_offset(tp)
File "/usr/lib/python3.7/site-packages/kafka/consumer/fetcher.py", line 237, in _reset_offset
offsets = self._retrieve_offsets({partition: timestamp})
File "/usr/lib/python3.7/site-packages/kafka/consumer/fetcher.py", line 302, in _retrieve_offsets
time.sleep(self.config['retry_backoff_ms'] / 1000.0)
KeyboardInterrupt
version: "3.7"
services:
spark-master:
image: bde2020/spark-master:3.0.1-hadoop3.2
ports:
- "8080:8080"
- "7077:7077"
volumes:
- ./work:/home/jovyan/work
environment:
- "SPARK_LOCAL_IP=spark-master"
spark-worker:
image: bde2020/spark-worker:3.0.1-hadoop3.2
depends_on:
- spark-master
environment:
- SPARK_MASTER=spark://spark-master:7077
- SPARK_WORKER_CORES=2
- SPARK_WORKER_MEMORY=3G
- SPARK_DRIVER_MEMORY=2G
- SPARK_EXECUTOR_MEMORY=2G
volumes:
- ./work:/home/jovyan/work
pyspark-notebook:
image: jupyter/pyspark-notebook
container_name: pyspark_notebook
ports:
- "8888:8888"
volumes:
- ./work:/home/jovyan/work
- ./work/model:/tmp/model_prediction
environment:
- PYSPARK_PYTHON=/usr/bin/python3
- PYSPARK_DRIVER_PYTHON=ipython3
zookeeper:
image: wurstmeister/zookeeper:3.4.6
expose:
- "2181"
kafka:
image: wurstmeister/kafka:2.11-2.0.0
depends_on:
- zookeeper
ports:
- "9092:9092"
expose:
- "9093"
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
mongo:
image: mongo
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: example
mongo-express:
image: mongo-express
restart: always
ports:
- 8081:8081
environment:
ME_CONFIG_MONGODB_ADMINUSERNAME: root
ME_CONFIG_MONGODB_ADMINPASSWORD: example
你能帮帮我吗?
相同docker撰写
来自主机
创建主题
$ docker-compose up -d
$ docker-compose exec kafka /opt/kafka/bin/kafka-topics.sh --create --topic test --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1
Created topic "test".
$ docker-compose exec kafka /opt/kafka/bin/kafka-topics.sh --list --zookeeper zookeeper:2181
test
验证 API 是 运行ning
$ curl -H 'Content-Type: application/json' localhost:5000/twitter
{"tweet":"foobar"}
安装 kafka-python
和 运行 生产者(使用未注释的刷新)
$ pip install requests kafka-python
$ python producer.py
验证主题中的数据
$ docker-compose exec kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
"{\"tweet\":\"foobar\"}\n"
从容器内部
使用 pyspark 笔记本@http://localhost:8888
打开终端选项卡
$ conda install kafka-python
(base) jovyan@3eaf696e1135:~$ python work/consumer.py
before for
IN for
{"tweet":"foobar"}
新消费者代码
import kafka
import json
import requests
from kafka import KafkaConsumer
# utiliser kafka
consumer = KafkaConsumer('test',
bootstrap_servers=['kafka:9093'], # needs to be the kafka INSIDE:// listener address
api_version='2.0.0',
group_id="test_id",
auto_offset_reset='earliest', # you're missing this
value_deserializer=json.loads)
print('before for ')
for msg in consumer:
print('IN for')
#print(type(consumer))
print(msg.value)
#print(consumer)
我发现哪里出了问题...
docker 图片无效。
我改变了,它正在工作。
我制作了自己的docker文件。
我正在寻找一种使用 kafka 将我的 API(本地主机)显示到我的 docker 的方法。
我的制作人(下图)工作起来很有魅力。我知道,因为当我打印 res.text 时,我有一个输出。
import json
import requests
from kafka import KafkaProducer
import time
# get data
res = requests.get('http://127.0.0.1:5000/twitter')
#print(res.text)
# use kafka
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])#, api_version='2.0.0')
producer.send('test', json.dumps(res.text).encode('utf-8'))
time.sleep(1)
#producer.flush()
但是,我的消费者不起作用。到目前为止,这是我尝试过的。
当前停在 for 循环。
import kafka
import json
import requests
from kafka import KafkaConsumer
# utiliser kafka
consumer = KafkaConsumer('test', bootstrap_servers=['localhost:9092'], api_version='2.0.0', group_id="test_id", value_deserializer = json.loads)
print('before for ')
consumer.subscribe('test')
for msg in consumer:
print('IN for')
#print(type(consumer))
print(json.loads(msg.value.decode()))
#print(consumer)
我在某处遗漏了一些东西,但我不知道是什么。
当我手动停止时,我从 docker 收到以下错误:
<class 'kafka.consumer.group.KafkaConsumer'>
^CTraceback (most recent call last):
File "consumer.py", line 11, in <module>
for m in consumer:
File "/usr/lib/python3.7/site-packages/kafka/consumer/group.py", line 1193, in __next__
return self.next_v2()
File "/usr/lib/python3.7/site-packages/kafka/consumer/group.py", line 1201, in next_v2
return next(self._iterator)
File "/usr/lib/python3.7/site-packages/kafka/consumer/group.py", line 1116, in _message_generator_v2
record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
File "/usr/lib/python3.7/site-packages/kafka/consumer/group.py", line 655, in poll
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
File "/usr/lib/python3.7/site-packages/kafka/consumer/group.py", line 680, in _poll_once
self._update_fetch_positions(self._subscription.missing_fetch_positions())
File "/usr/lib/python3.7/site-packages/kafka/consumer/group.py", line 1112, in _update_fetch_positions
self._fetcher.update_fetch_positions(partitions)
File "/usr/lib/python3.7/site-packages/kafka/consumer/fetcher.py", line 186, in update_fetch_positions
self._reset_offset(tp)
File "/usr/lib/python3.7/site-packages/kafka/consumer/fetcher.py", line 237, in _reset_offset
offsets = self._retrieve_offsets({partition: timestamp})
File "/usr/lib/python3.7/site-packages/kafka/consumer/fetcher.py", line 302, in _retrieve_offsets
time.sleep(self.config['retry_backoff_ms'] / 1000.0)
KeyboardInterrupt
version: "3.7"
services:
spark-master:
image: bde2020/spark-master:3.0.1-hadoop3.2
ports:
- "8080:8080"
- "7077:7077"
volumes:
- ./work:/home/jovyan/work
environment:
- "SPARK_LOCAL_IP=spark-master"
spark-worker:
image: bde2020/spark-worker:3.0.1-hadoop3.2
depends_on:
- spark-master
environment:
- SPARK_MASTER=spark://spark-master:7077
- SPARK_WORKER_CORES=2
- SPARK_WORKER_MEMORY=3G
- SPARK_DRIVER_MEMORY=2G
- SPARK_EXECUTOR_MEMORY=2G
volumes:
- ./work:/home/jovyan/work
pyspark-notebook:
image: jupyter/pyspark-notebook
container_name: pyspark_notebook
ports:
- "8888:8888"
volumes:
- ./work:/home/jovyan/work
- ./work/model:/tmp/model_prediction
environment:
- PYSPARK_PYTHON=/usr/bin/python3
- PYSPARK_DRIVER_PYTHON=ipython3
zookeeper:
image: wurstmeister/zookeeper:3.4.6
expose:
- "2181"
kafka:
image: wurstmeister/kafka:2.11-2.0.0
depends_on:
- zookeeper
ports:
- "9092:9092"
expose:
- "9093"
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
mongo:
image: mongo
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: example
mongo-express:
image: mongo-express
restart: always
ports:
- 8081:8081
environment:
ME_CONFIG_MONGODB_ADMINUSERNAME: root
ME_CONFIG_MONGODB_ADMINPASSWORD: example
你能帮帮我吗?
相同docker撰写
来自主机
创建主题
$ docker-compose up -d
$ docker-compose exec kafka /opt/kafka/bin/kafka-topics.sh --create --topic test --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1
Created topic "test".
$ docker-compose exec kafka /opt/kafka/bin/kafka-topics.sh --list --zookeeper zookeeper:2181
test
验证 API 是 运行ning
$ curl -H 'Content-Type: application/json' localhost:5000/twitter
{"tweet":"foobar"}
安装 kafka-python
和 运行 生产者(使用未注释的刷新)
$ pip install requests kafka-python
$ python producer.py
验证主题中的数据
$ docker-compose exec kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
"{\"tweet\":\"foobar\"}\n"
从容器内部
使用 pyspark 笔记本@http://localhost:8888
打开终端选项卡
$ conda install kafka-python
(base) jovyan@3eaf696e1135:~$ python work/consumer.py
before for
IN for
{"tweet":"foobar"}
新消费者代码
import kafka
import json
import requests
from kafka import KafkaConsumer
# utiliser kafka
consumer = KafkaConsumer('test',
bootstrap_servers=['kafka:9093'], # needs to be the kafka INSIDE:// listener address
api_version='2.0.0',
group_id="test_id",
auto_offset_reset='earliest', # you're missing this
value_deserializer=json.loads)
print('before for ')
for msg in consumer:
print('IN for')
#print(type(consumer))
print(msg.value)
#print(consumer)
我发现哪里出了问题...
docker 图片无效。
我改变了,它正在工作。
我制作了自己的docker文件。