从连接到另一个容器的容器使用 python 创建一个新的 kafka 主题
Create a new kafka topic using python from a container connecting to another container
我想使用 python 创建一个新的 kafka 主题,当我尝试使用 server="kafka:9092"
创建 KafkaAdminClient 时出现错误:
self._kafka_admin = KafkaAdminClient(
bootstrap_servers=[server],
api_version=(0, 10, 2),
api_version_auto_timeout_ms=120000)
我得到的错误:
Traceback (most recent call last):
File "main.py", line 47, in <module>
kafka_manager = KafkaManager("kafka:9092")
File "/app/src/kafka/kafka_manager.py", line 24, in __init__
self._kafka_admin = KafkaAdminClient(
File "/usr/local/lib/python3.8/site-packages/kafka/admin/client.py", line 211, in __init__
self._client.check_version(timeout=(self.config['api_version_auto_timeout_ms'] / 1000))
File "/usr/local/lib/python3.8/site-packages/kafka/client_async.py", line 900, in check_version
raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
此外,我已经构建了下一个 docker-compose 文件:
version: '3'
services:
spark-master:
image: docker.io/bitnami/spark:2
environment:
- SPARK_MODE=master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
volumes:
- type: bind
source: ./conf/log4j.properties
target: /opt/bitnami/spark/conf/log4j.properties
ports:
- '8080:8080'
- '7077:7077'
networks:
- spark
container_name: spark
spark-worker-1:
image: docker.io/bitnami/spark:2
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://localhost:7077
- SPARK_WORKER_MEMORY=1G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
volumes:
- type: bind
source: ./conf/log4j.properties
target: /opt/bitnami/spark/conf/log4j.properties
ports:
- '8081:8081'
container_name: spark-worker
networks:
- spark
depends_on:
- spark-master
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
container_name: zookeeper
networks:
- rmoff_kafka
kafka:
image: confluentinc/cp-kafka:5.5.0
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
container_name: kafka
networks:
- rmoff_kafka
app:
build:
context: ./
depends_on:
- kafka
ports:
- 5000:5000
container_name: app
networks:
- rmoff_kafka
networks:
spark:
driver: bridge
rmoff_kafka:
name: rmoff_kafka
最后,结构是这样的,2 个容器(1 个用于 python 应用程序,另一个 1 个用于 kafka):
和docker ps
结果查看容器的详细信息:
解决方案是将主机名更改为容器(kafka)的名称,并在python中添加一个sleep()命令以等待kafka容器启动。
我想使用 python 创建一个新的 kafka 主题,当我尝试使用 server="kafka:9092"
创建 KafkaAdminClient 时出现错误:
self._kafka_admin = KafkaAdminClient(
bootstrap_servers=[server],
api_version=(0, 10, 2),
api_version_auto_timeout_ms=120000)
我得到的错误:
Traceback (most recent call last):
File "main.py", line 47, in <module>
kafka_manager = KafkaManager("kafka:9092")
File "/app/src/kafka/kafka_manager.py", line 24, in __init__
self._kafka_admin = KafkaAdminClient(
File "/usr/local/lib/python3.8/site-packages/kafka/admin/client.py", line 211, in __init__
self._client.check_version(timeout=(self.config['api_version_auto_timeout_ms'] / 1000))
File "/usr/local/lib/python3.8/site-packages/kafka/client_async.py", line 900, in check_version
raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
此外,我已经构建了下一个 docker-compose 文件:
version: '3'
services:
spark-master:
image: docker.io/bitnami/spark:2
environment:
- SPARK_MODE=master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
volumes:
- type: bind
source: ./conf/log4j.properties
target: /opt/bitnami/spark/conf/log4j.properties
ports:
- '8080:8080'
- '7077:7077'
networks:
- spark
container_name: spark
spark-worker-1:
image: docker.io/bitnami/spark:2
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://localhost:7077
- SPARK_WORKER_MEMORY=1G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
volumes:
- type: bind
source: ./conf/log4j.properties
target: /opt/bitnami/spark/conf/log4j.properties
ports:
- '8081:8081'
container_name: spark-worker
networks:
- spark
depends_on:
- spark-master
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
container_name: zookeeper
networks:
- rmoff_kafka
kafka:
image: confluentinc/cp-kafka:5.5.0
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
container_name: kafka
networks:
- rmoff_kafka
app:
build:
context: ./
depends_on:
- kafka
ports:
- 5000:5000
container_name: app
networks:
- rmoff_kafka
networks:
spark:
driver: bridge
rmoff_kafka:
name: rmoff_kafka
最后,结构是这样的,2 个容器(1 个用于 python 应用程序,另一个 1 个用于 kafka):
和docker ps
结果查看容器的详细信息:
解决方案是将主机名更改为容器(kafka)的名称,并在python中添加一个sleep()命令以等待kafka容器启动。