从连接到另一个容器的容器使用 python 创建一个新的 kafka 主题

Create a new kafka topic using python from a container connecting to another container

我想使用 python 创建一个新的 kafka 主题,当我尝试使用 server="kafka:9092" 创建 KafkaAdminClient 时出现错误:

self._kafka_admin = KafkaAdminClient(
                         bootstrap_servers=[server],
                         api_version=(0, 10, 2),
                         api_version_auto_timeout_ms=120000)

我得到的错误:

Traceback (most recent call last):
  File "main.py", line 47, in <module>
    kafka_manager = KafkaManager("kafka:9092") 
  File "/app/src/kafka/kafka_manager.py", line 24, in __init__
    self._kafka_admin = KafkaAdminClient(
  File "/usr/local/lib/python3.8/site-packages/kafka/admin/client.py", line 211, in __init__
    self._client.check_version(timeout=(self.config['api_version_auto_timeout_ms'] / 1000))
  File "/usr/local/lib/python3.8/site-packages/kafka/client_async.py", line 900, in check_version
    raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable

此外,我已经构建了下一个 docker-compose 文件:

version: '3'
services:
  spark-master:
    image: docker.io/bitnami/spark:2
    environment:
      - SPARK_MODE=master
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
    volumes:
      - type: bind
        source: ./conf/log4j.properties
        target: /opt/bitnami/spark/conf/log4j.properties
    ports:
      - '8080:8080'
      - '7077:7077'
    networks:
      - spark
    container_name: spark
  spark-worker-1:
    image: docker.io/bitnami/spark:2
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://localhost:7077
      - SPARK_WORKER_MEMORY=1G
      - SPARK_WORKER_CORES=1
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
    volumes:
      - type: bind
        source: ./conf/log4j.properties
        target: /opt/bitnami/spark/conf/log4j.properties
    ports:
      - '8081:8081'
    container_name: spark-worker
    networks:
      - spark
    depends_on:
      - spark-master
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
    container_name: zookeeper
    networks: 
      - rmoff_kafka
  kafka:
    image: confluentinc/cp-kafka:5.5.0
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    container_name: kafka
    networks: 
      - rmoff_kafka
  app:
    build:
      context: ./
    depends_on: 
      - kafka
    ports:
      - 5000:5000
    container_name: app
    networks: 
      - rmoff_kafka

networks:
  spark:
    driver: bridge
  rmoff_kafka:
    name: rmoff_kafka

最后,结构是这样的,2 个容器(1 个用于 python 应用程序,另一个 1 个用于 kafka):

docker ps结果查看容器的详细信息:

解决方案是将主机名更改为容器(kafka)的名称,并在python中添加一个sleep()命令以等待kafka容器启动。