使用 python 通过 Rest Proxy 消费 Kafka

Consumer Kafka via Rest Proxy with python

我正在通过 docker 使用 kafka 环境。正确上升!

但是我无法使用我的 python 脚本执行 REST 查询...

我正在尝试阅读流媒体上收到的所有消息!

有什么改正建议吗?

抱歉输出太长,我想详细说明问题以方便调试:)

consumer.py

#!/usr/bin/python
# -*- coding: utf-8 -*-
import requests
import base64
import json
import sys

REST_PROXY_URL = 'http://localhost:8082'
CONSUMER_INSTACE = 'zabbix_consumer'
NAME_TOPIC = 'zabbix'

def delete_consumer(BASE_URI):
    '''Delete the consumer'''

    headers = {'Accept': 'application/vnd.kafka.v2+json'}
    r = requests.delete(BASE_URI, headers=headers)

def create_consumer():
    '''Create the Consumer instance'''

    delete_consumer(f'{REST_PROXY_URL}/consumers/{CONSUMER_INSTACE}/instances/{CONSUMER_INSTACE}')
    PAYLOAD = {'format': 'json', 'name': f'{CONSUMER_INSTACE}', 'auto.offset.reset': 'earliest'}
    HEADERS = {'Content-Type': 'application/vnd.kafka.v2+json'}
    r = requests.post(f'{REST_PROXY_URL}/consumers/{CONSUMER_INSTACE}', data=json.dumps(PAYLOAD),
                      headers=HEADERS)
    if r.status_code != 200:
        print('Status Code: ' + str(r.status_code))
        print(r.text)
        sys.exit('Error thrown while creating consumer')
    return r.json()['base_uri']

def get_messages():
    '''Get the messages from the consumer'''
    
    BASE_URI = create_consumer()
    HEADERS = {'Accept': 'application/vnd.kafka.v2+json'}
    r = requests.get(BASE_URI + f'/topics/{NAME_TOPIC}', headers=HEADERS, timeout=30)
    if r.status_code != 200: 
        print('Status Code: ' + str(r.status_code))
        print(r.text)
        sys.exit('Error thrown while getting message')
    for message in r.json():
        if message['key'] is not None:
            print('Message Key:' + base64.b64decode(message['key']))
        print('Message Value:' + base64.b64decode(message['value']))

if __name__ == '__main__':
    get_messages()

输出

Traceback (most recent call last):
  File "/usr/lib/python3/dist-packages/urllib3/connection.py", line 159, in _new_conn
    conn = connection.create_connection(
  File "/usr/lib/python3/dist-packages/urllib3/util/connection.py", line 61, in create_connection
    for res in socket.getaddrinfo(host, port, family, socket.SOCK_STREAM):
  File "/usr/lib/python3.8/socket.py", line 918, in getaddrinfo
    for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
socket.gaierror: [Errno -3] Temporary failure in name resolution

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 665, in urlopen
    httplib_response = self._make_request(
  File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 387, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/usr/lib/python3.8/http/client.py", line 1255, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/usr/lib/python3.8/http/client.py", line 1301, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/usr/lib/python3.8/http/client.py", line 1250, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/usr/lib/python3.8/http/client.py", line 1010, in _send_output
    self.send(msg)
  File "/usr/lib/python3.8/http/client.py", line 950, in send
    self.connect()
  File "/usr/lib/python3/dist-packages/urllib3/connection.py", line 187, in connect
    conn = self._new_conn()
  File "/usr/lib/python3/dist-packages/urllib3/connection.py", line 171, in _new_conn
    raise NewConnectionError(
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7f0650a9be80>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3/dist-packages/requests/adapters.py", line 439, in send
    resp = conn.urlopen(
  File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 719, in urlopen
    retries = retries.increment(
  File "/usr/lib/python3/dist-packages/urllib3/util/retry.py", line 436, in increment
    raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='rest-proxy', port=8082): Max retries exceeded with url: /consumers/zabbix_consumer/instances/zabbix_consumer/topics/zabbix (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f0650a9be80>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "consumer.py", line 48, in <module>
    get_messages()
  File "consumer.py", line 37, in get_messages
    r = requests.get(BASE_URI + f'/topics/{NAME_TOPIC}', headers=HEADERS, timeout=30)
  File "/usr/lib/python3/dist-packages/requests/api.py", line 75, in get
    return request('get', url, params=params, **kwargs)
  File "/usr/lib/python3/dist-packages/requests/api.py", line 60, in request
    return session.request(method=method, url=url, **kwargs)
  File "/usr/lib/python3/dist-packages/requests/sessions.py", line 533, in request
    resp = self.send(prep, **send_kwargs)
  File "/usr/lib/python3/dist-packages/requests/sessions.py", line 646, in send
    r = adapter.send(request, **kwargs)
  File "/usr/lib/python3/dist-packages/requests/adapters.py", line 516, in send
    raise ConnectionError(e, request=request)
requests.exceptions.ConnectionError: HTTPConnectionPool(host='rest-proxy', port=8082): Max retries exceeded with url: /consumers/zabbix_consumer/instances/zabbix_consumer/topics/zabbix (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f0650a9be80>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution'))

记录休息容器

[2021-05-18 02:07:12,174] INFO 127.0.0.1 - - [18/May/2021:02:07:12 +0000] "DELETE /consumers/zabbix_consumer/instances/zabbix_consumer HTTP/1.1" 404 61  3 (io.confluent.rest-utils.requests)
[2021-05-18 02:07:12,180] INFO ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = consumer-zabbix_consumer-9
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = zabbix_consumer
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    internal.throw.on.fetch.stable.offset.unsupported = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 30
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    socket.connection.setup.timeout.max.ms = 127000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig)
[2021-05-18 02:07:12,184] WARN The configuration 'metrics.context.resource.cluster.id' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2021-05-18 02:07:12,184] WARN The configuration 'metrics.context._namespace' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2021-05-18 02:07:12,184] WARN The configuration 'metrics.context.resource.version' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2021-05-18 02:07:12,184] WARN The configuration 'metrics.context.resource.commit.id' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2021-05-18 02:07:12,184] WARN The configuration 'metrics.context.resource.type' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2021-05-18 02:07:12,184] WARN The configuration 'schema.registry.url' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2021-05-18 02:07:12,184] INFO Kafka version: 6.1.1-ce (org.apache.kafka.common.utils.AppInfoParser)
[2021-05-18 02:07:12,184] INFO Kafka commitId: 73deb3aeb1f8647c (org.apache.kafka.common.utils.AppInfoParser)
[2021-05-18 02:07:12,184] INFO Kafka startTimeMs: 1621303632184 (org.apache.kafka.common.utils.AppInfoParser)
[2021-05-18 02:07:12,185] INFO KafkaRestConfig values: 
    access.control.allow.headers = 
    access.control.allow.methods = 
    access.control.allow.origin = 
    access.control.skip.options = true
    advertised.listeners = []
    api.endpoints.blocklist = []
    api.v2.enable = true
    api.v3.enable = true
    authentication.method = NONE
    authentication.realm = 
    authentication.roles = [*]
    authentication.skip.paths = []
    bootstrap.servers = localhost:9092
    client.init.timeout.ms = 60000
    client.sasl.kerberos.kinit.cmd = /usr/bin/kinit
    client.sasl.kerberos.min.time.before.relogin = 60000
    client.sasl.kerberos.service.name = 
    client.sasl.kerberos.ticket.renew.jitter = 0.05
    client.sasl.kerberos.ticket.renew.window.factor = 0.8
    client.sasl.mechanism = GSSAPI
    client.security.protocol = PLAINTEXT
    client.ssl.cipher.suites = 
    client.ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1
    client.ssl.endpoint.identification.algorithm = 
    client.ssl.key.password = [hidden]
    client.ssl.keymanager.algorithm = SunX509
    client.ssl.keystore.location = 
    client.ssl.keystore.password = [hidden]
    client.ssl.keystore.type = JKS
    client.ssl.protocol = TLS
    client.ssl.provider = 
    client.ssl.trustmanager.algorithm = PKIX
    client.ssl.truststore.location = 
    client.ssl.truststore.password = [hidden]
    client.ssl.truststore.type = JKS
    client.timeout.ms = 500
    client.zk.session.timeout.ms = 30000
    compression.enable = true
    confluent.resource.name.authority = 
    consumer.instance.timeout.ms = 300000
    consumer.iterator.backoff.ms = 50
    consumer.iterator.timeout.ms = 1
    consumer.request.max.bytes = 67108864
    consumer.request.timeout.ms = 1000
    consumer.threads = 50
    csrf.prevention.enable = false
    csrf.prevention.token.endpoint = /csrf
    csrf.prevention.token.expiration.minutes = 30
    csrf.prevention.token.max.entries = 10000
    debug = false
    fetch.min.bytes = -1
    host.name = rest-proxy
    id = 
    idle.timeout.ms = 30000
    kafka.rest.resource.extension.class = []
    listeners = [http://localhost:8082]
    metric.reporters = []
    metrics.jmx.prefix = kafka.rest
    metrics.num.samples = 2
    metrics.sample.window.ms = 30000
    metrics.tag.map = []
    port = 8082
    producer.threads = 5
    request.logger.name = io.confluent.rest-utils.requests
    request.queue.capacity = 2147483647
    request.queue.capacity.growby = 64
    request.queue.capacity.init = 128
    resource.extension.classes = []
    response.http.headers.config = 
    response.mediatype.default = application/json
    response.mediatype.preferred = [application/json, application/vnd.kafka.v2+json]
    rest.servlet.initializor.classes = []
    schema.registry.url = http://schema-registry:8081
    shutdown.graceful.ms = 1000
    simpleconsumer.pool.size.max = 25
    simpleconsumer.pool.timeout.ms = 1000
    ssl.cipher.suites = []
    ssl.client.auth = false
    ssl.client.authentication = NONE
    ssl.enabled.protocols = []
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = [hidden]
    ssl.keymanager.algorithm = 
    ssl.keystore.location = 
    ssl.keystore.password = [hidden]
    ssl.keystore.reload = false
    ssl.keystore.type = JKS
    ssl.keystore.watch.location = 
    ssl.protocol = TLS
    ssl.provider = 
    ssl.trustmanager.algorithm = 
    ssl.truststore.location = 
    ssl.truststore.password = [hidden]
    ssl.truststore.type = JKS
    thread.pool.max = 200
    thread.pool.min = 8
    websocket.path.prefix = /ws
    websocket.servlet.initializor.classes = []
    zookeeper.connect = 
 (io.confluent.kafkarest.KafkaRestConfig)
[2021-05-18 02:07:12,187] INFO 127.0.0.1 - - [18/May/2021:02:07:12 +0000] "POST /consumers/zabbix_consumer HTTP/1.1" 200 100  10 (io.confluent.rest-utils.requests)

docker-撰写

---
version: '2'
services:
  zookeeper-1:
    image: confluentinc/cp-zookeeper:latest
    ports:
      - 2181:2181
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: localhost:22888:23888
    network_mode: host

  kafka-1:
    image: confluentinc/cp-kafka:latest
    ports:
      - 9092:9092
    network_mode: host
    depends_on:
      - zookeeper-1
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: localhost:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092

  schema-registry:
    image: confluentinc/cp-schema-registry:latest
    network_mode: host
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - zookeeper-1
      - kafka-1
    ports:
      - 8081:8081
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: localhost:2181
      SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
      SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,OPTIONS'

  rest-proxy:
    image: confluentinc/cp-kafka-rest:latest
    network_mode: host
    depends_on:
      - zookeeper-1
      - kafka-1
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: localhost:9092
      KAFKA_REST_LISTENERS: http://localhost:8082
      KAFKA_REST_SCHEMA_REGISTRY_URL: http://schema-registry:8081

只需使用 kafka-python 包。

pip install kafka-python

然后像这样订阅它:

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    "your-topic",
    bootstrap_servers = "your-kafka-server",
    group_id = "your-consumer-group",
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    value_deserializer=lambda x: json.loads(x.decode('utf-8')))

for record in consumer:
    data = record.value

请注意,消费者组的成员数量不能超过您的分区数量。否则他们会在没有得到任何数据的情况下挂起。

不用说,value_deserializer 取决于将数据插入主题时如何序列化数据。