使用 python 通过 Rest Proxy 消费 Kafka
Consumer Kafka via Rest Proxy with python
我正在通过 docker 使用 kafka 环境。正确上升!
但是我无法使用我的 python 脚本执行 REST 查询...
我正在尝试阅读流媒体上收到的所有消息!
有什么改正建议吗?
抱歉输出太长,我想详细说明问题以方便调试:)
consumer.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
import requests
import base64
import json
import sys
REST_PROXY_URL = 'http://localhost:8082'
CONSUMER_INSTACE = 'zabbix_consumer'
NAME_TOPIC = 'zabbix'
def delete_consumer(BASE_URI):
'''Delete the consumer'''
headers = {'Accept': 'application/vnd.kafka.v2+json'}
r = requests.delete(BASE_URI, headers=headers)
def create_consumer():
'''Create the Consumer instance'''
delete_consumer(f'{REST_PROXY_URL}/consumers/{CONSUMER_INSTACE}/instances/{CONSUMER_INSTACE}')
PAYLOAD = {'format': 'json', 'name': f'{CONSUMER_INSTACE}', 'auto.offset.reset': 'earliest'}
HEADERS = {'Content-Type': 'application/vnd.kafka.v2+json'}
r = requests.post(f'{REST_PROXY_URL}/consumers/{CONSUMER_INSTACE}', data=json.dumps(PAYLOAD),
headers=HEADERS)
if r.status_code != 200:
print('Status Code: ' + str(r.status_code))
print(r.text)
sys.exit('Error thrown while creating consumer')
return r.json()['base_uri']
def get_messages():
'''Get the messages from the consumer'''
BASE_URI = create_consumer()
HEADERS = {'Accept': 'application/vnd.kafka.v2+json'}
r = requests.get(BASE_URI + f'/topics/{NAME_TOPIC}', headers=HEADERS, timeout=30)
if r.status_code != 200:
print('Status Code: ' + str(r.status_code))
print(r.text)
sys.exit('Error thrown while getting message')
for message in r.json():
if message['key'] is not None:
print('Message Key:' + base64.b64decode(message['key']))
print('Message Value:' + base64.b64decode(message['value']))
if __name__ == '__main__':
get_messages()
输出
Traceback (most recent call last):
File "/usr/lib/python3/dist-packages/urllib3/connection.py", line 159, in _new_conn
conn = connection.create_connection(
File "/usr/lib/python3/dist-packages/urllib3/util/connection.py", line 61, in create_connection
for res in socket.getaddrinfo(host, port, family, socket.SOCK_STREAM):
File "/usr/lib/python3.8/socket.py", line 918, in getaddrinfo
for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
socket.gaierror: [Errno -3] Temporary failure in name resolution
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 665, in urlopen
httplib_response = self._make_request(
File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 387, in _make_request
conn.request(method, url, **httplib_request_kw)
File "/usr/lib/python3.8/http/client.py", line 1255, in request
self._send_request(method, url, body, headers, encode_chunked)
File "/usr/lib/python3.8/http/client.py", line 1301, in _send_request
self.endheaders(body, encode_chunked=encode_chunked)
File "/usr/lib/python3.8/http/client.py", line 1250, in endheaders
self._send_output(message_body, encode_chunked=encode_chunked)
File "/usr/lib/python3.8/http/client.py", line 1010, in _send_output
self.send(msg)
File "/usr/lib/python3.8/http/client.py", line 950, in send
self.connect()
File "/usr/lib/python3/dist-packages/urllib3/connection.py", line 187, in connect
conn = self._new_conn()
File "/usr/lib/python3/dist-packages/urllib3/connection.py", line 171, in _new_conn
raise NewConnectionError(
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7f0650a9be80>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib/python3/dist-packages/requests/adapters.py", line 439, in send
resp = conn.urlopen(
File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 719, in urlopen
retries = retries.increment(
File "/usr/lib/python3/dist-packages/urllib3/util/retry.py", line 436, in increment
raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='rest-proxy', port=8082): Max retries exceeded with url: /consumers/zabbix_consumer/instances/zabbix_consumer/topics/zabbix (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f0650a9be80>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "consumer.py", line 48, in <module>
get_messages()
File "consumer.py", line 37, in get_messages
r = requests.get(BASE_URI + f'/topics/{NAME_TOPIC}', headers=HEADERS, timeout=30)
File "/usr/lib/python3/dist-packages/requests/api.py", line 75, in get
return request('get', url, params=params, **kwargs)
File "/usr/lib/python3/dist-packages/requests/api.py", line 60, in request
return session.request(method=method, url=url, **kwargs)
File "/usr/lib/python3/dist-packages/requests/sessions.py", line 533, in request
resp = self.send(prep, **send_kwargs)
File "/usr/lib/python3/dist-packages/requests/sessions.py", line 646, in send
r = adapter.send(request, **kwargs)
File "/usr/lib/python3/dist-packages/requests/adapters.py", line 516, in send
raise ConnectionError(e, request=request)
requests.exceptions.ConnectionError: HTTPConnectionPool(host='rest-proxy', port=8082): Max retries exceeded with url: /consumers/zabbix_consumer/instances/zabbix_consumer/topics/zabbix (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f0650a9be80>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution'))
记录休息容器
[2021-05-18 02:07:12,174] INFO 127.0.0.1 - - [18/May/2021:02:07:12 +0000] "DELETE /consumers/zabbix_consumer/instances/zabbix_consumer HTTP/1.1" 404 61 3 (io.confluent.rest-utils.requests)
[2021-05-18 02:07:12,180] INFO ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-zabbix_consumer-9
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = zabbix_consumer
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 30
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
socket.connection.setup.timeout.max.ms = 127000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
(org.apache.kafka.clients.consumer.ConsumerConfig)
[2021-05-18 02:07:12,184] WARN The configuration 'metrics.context.resource.cluster.id' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2021-05-18 02:07:12,184] WARN The configuration 'metrics.context._namespace' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2021-05-18 02:07:12,184] WARN The configuration 'metrics.context.resource.version' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2021-05-18 02:07:12,184] WARN The configuration 'metrics.context.resource.commit.id' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2021-05-18 02:07:12,184] WARN The configuration 'metrics.context.resource.type' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2021-05-18 02:07:12,184] WARN The configuration 'schema.registry.url' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2021-05-18 02:07:12,184] INFO Kafka version: 6.1.1-ce (org.apache.kafka.common.utils.AppInfoParser)
[2021-05-18 02:07:12,184] INFO Kafka commitId: 73deb3aeb1f8647c (org.apache.kafka.common.utils.AppInfoParser)
[2021-05-18 02:07:12,184] INFO Kafka startTimeMs: 1621303632184 (org.apache.kafka.common.utils.AppInfoParser)
[2021-05-18 02:07:12,185] INFO KafkaRestConfig values:
access.control.allow.headers =
access.control.allow.methods =
access.control.allow.origin =
access.control.skip.options = true
advertised.listeners = []
api.endpoints.blocklist = []
api.v2.enable = true
api.v3.enable = true
authentication.method = NONE
authentication.realm =
authentication.roles = [*]
authentication.skip.paths = []
bootstrap.servers = localhost:9092
client.init.timeout.ms = 60000
client.sasl.kerberos.kinit.cmd = /usr/bin/kinit
client.sasl.kerberos.min.time.before.relogin = 60000
client.sasl.kerberos.service.name =
client.sasl.kerberos.ticket.renew.jitter = 0.05
client.sasl.kerberos.ticket.renew.window.factor = 0.8
client.sasl.mechanism = GSSAPI
client.security.protocol = PLAINTEXT
client.ssl.cipher.suites =
client.ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1
client.ssl.endpoint.identification.algorithm =
client.ssl.key.password = [hidden]
client.ssl.keymanager.algorithm = SunX509
client.ssl.keystore.location =
client.ssl.keystore.password = [hidden]
client.ssl.keystore.type = JKS
client.ssl.protocol = TLS
client.ssl.provider =
client.ssl.trustmanager.algorithm = PKIX
client.ssl.truststore.location =
client.ssl.truststore.password = [hidden]
client.ssl.truststore.type = JKS
client.timeout.ms = 500
client.zk.session.timeout.ms = 30000
compression.enable = true
confluent.resource.name.authority =
consumer.instance.timeout.ms = 300000
consumer.iterator.backoff.ms = 50
consumer.iterator.timeout.ms = 1
consumer.request.max.bytes = 67108864
consumer.request.timeout.ms = 1000
consumer.threads = 50
csrf.prevention.enable = false
csrf.prevention.token.endpoint = /csrf
csrf.prevention.token.expiration.minutes = 30
csrf.prevention.token.max.entries = 10000
debug = false
fetch.min.bytes = -1
host.name = rest-proxy
id =
idle.timeout.ms = 30000
kafka.rest.resource.extension.class = []
listeners = [http://localhost:8082]
metric.reporters = []
metrics.jmx.prefix = kafka.rest
metrics.num.samples = 2
metrics.sample.window.ms = 30000
metrics.tag.map = []
port = 8082
producer.threads = 5
request.logger.name = io.confluent.rest-utils.requests
request.queue.capacity = 2147483647
request.queue.capacity.growby = 64
request.queue.capacity.init = 128
resource.extension.classes = []
response.http.headers.config =
response.mediatype.default = application/json
response.mediatype.preferred = [application/json, application/vnd.kafka.v2+json]
rest.servlet.initializor.classes = []
schema.registry.url = http://schema-registry:8081
shutdown.graceful.ms = 1000
simpleconsumer.pool.size.max = 25
simpleconsumer.pool.timeout.ms = 1000
ssl.cipher.suites = []
ssl.client.auth = false
ssl.client.authentication = NONE
ssl.enabled.protocols = []
ssl.endpoint.identification.algorithm = null
ssl.key.password = [hidden]
ssl.keymanager.algorithm =
ssl.keystore.location =
ssl.keystore.password = [hidden]
ssl.keystore.reload = false
ssl.keystore.type = JKS
ssl.keystore.watch.location =
ssl.protocol = TLS
ssl.provider =
ssl.trustmanager.algorithm =
ssl.truststore.location =
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
thread.pool.max = 200
thread.pool.min = 8
websocket.path.prefix = /ws
websocket.servlet.initializor.classes = []
zookeeper.connect =
(io.confluent.kafkarest.KafkaRestConfig)
[2021-05-18 02:07:12,187] INFO 127.0.0.1 - - [18/May/2021:02:07:12 +0000] "POST /consumers/zabbix_consumer HTTP/1.1" 200 100 10 (io.confluent.rest-utils.requests)
docker-撰写
---
version: '2'
services:
zookeeper-1:
image: confluentinc/cp-zookeeper:latest
ports:
- 2181:2181
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
ZOOKEEPER_SERVERS: localhost:22888:23888
network_mode: host
kafka-1:
image: confluentinc/cp-kafka:latest
ports:
- 9092:9092
network_mode: host
depends_on:
- zookeeper-1
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
schema-registry:
image: confluentinc/cp-schema-registry:latest
network_mode: host
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper-1
- kafka-1
ports:
- 8081:8081
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: localhost:2181
SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,OPTIONS'
rest-proxy:
image: confluentinc/cp-kafka-rest:latest
network_mode: host
depends_on:
- zookeeper-1
- kafka-1
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: localhost:9092
KAFKA_REST_LISTENERS: http://localhost:8082
KAFKA_REST_SCHEMA_REGISTRY_URL: http://schema-registry:8081
只需使用 kafka-python 包。
pip install kafka-python
然后像这样订阅它:
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
"your-topic",
bootstrap_servers = "your-kafka-server",
group_id = "your-consumer-group",
auto_offset_reset='earliest',
enable_auto_commit=True,
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
for record in consumer:
data = record.value
请注意,消费者组的成员数量不能超过您的分区数量。否则他们会在没有得到任何数据的情况下挂起。
不用说,value_deserializer
取决于将数据插入主题时如何序列化数据。
我正在通过 docker 使用 kafka 环境。正确上升!
但是我无法使用我的 python 脚本执行 REST 查询...
我正在尝试阅读流媒体上收到的所有消息!
有什么改正建议吗?
抱歉输出太长,我想详细说明问题以方便调试:)
consumer.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
import requests
import base64
import json
import sys
REST_PROXY_URL = 'http://localhost:8082'
CONSUMER_INSTACE = 'zabbix_consumer'
NAME_TOPIC = 'zabbix'
def delete_consumer(BASE_URI):
'''Delete the consumer'''
headers = {'Accept': 'application/vnd.kafka.v2+json'}
r = requests.delete(BASE_URI, headers=headers)
def create_consumer():
'''Create the Consumer instance'''
delete_consumer(f'{REST_PROXY_URL}/consumers/{CONSUMER_INSTACE}/instances/{CONSUMER_INSTACE}')
PAYLOAD = {'format': 'json', 'name': f'{CONSUMER_INSTACE}', 'auto.offset.reset': 'earliest'}
HEADERS = {'Content-Type': 'application/vnd.kafka.v2+json'}
r = requests.post(f'{REST_PROXY_URL}/consumers/{CONSUMER_INSTACE}', data=json.dumps(PAYLOAD),
headers=HEADERS)
if r.status_code != 200:
print('Status Code: ' + str(r.status_code))
print(r.text)
sys.exit('Error thrown while creating consumer')
return r.json()['base_uri']
def get_messages():
'''Get the messages from the consumer'''
BASE_URI = create_consumer()
HEADERS = {'Accept': 'application/vnd.kafka.v2+json'}
r = requests.get(BASE_URI + f'/topics/{NAME_TOPIC}', headers=HEADERS, timeout=30)
if r.status_code != 200:
print('Status Code: ' + str(r.status_code))
print(r.text)
sys.exit('Error thrown while getting message')
for message in r.json():
if message['key'] is not None:
print('Message Key:' + base64.b64decode(message['key']))
print('Message Value:' + base64.b64decode(message['value']))
if __name__ == '__main__':
get_messages()
输出
Traceback (most recent call last):
File "/usr/lib/python3/dist-packages/urllib3/connection.py", line 159, in _new_conn
conn = connection.create_connection(
File "/usr/lib/python3/dist-packages/urllib3/util/connection.py", line 61, in create_connection
for res in socket.getaddrinfo(host, port, family, socket.SOCK_STREAM):
File "/usr/lib/python3.8/socket.py", line 918, in getaddrinfo
for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
socket.gaierror: [Errno -3] Temporary failure in name resolution
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 665, in urlopen
httplib_response = self._make_request(
File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 387, in _make_request
conn.request(method, url, **httplib_request_kw)
File "/usr/lib/python3.8/http/client.py", line 1255, in request
self._send_request(method, url, body, headers, encode_chunked)
File "/usr/lib/python3.8/http/client.py", line 1301, in _send_request
self.endheaders(body, encode_chunked=encode_chunked)
File "/usr/lib/python3.8/http/client.py", line 1250, in endheaders
self._send_output(message_body, encode_chunked=encode_chunked)
File "/usr/lib/python3.8/http/client.py", line 1010, in _send_output
self.send(msg)
File "/usr/lib/python3.8/http/client.py", line 950, in send
self.connect()
File "/usr/lib/python3/dist-packages/urllib3/connection.py", line 187, in connect
conn = self._new_conn()
File "/usr/lib/python3/dist-packages/urllib3/connection.py", line 171, in _new_conn
raise NewConnectionError(
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7f0650a9be80>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib/python3/dist-packages/requests/adapters.py", line 439, in send
resp = conn.urlopen(
File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 719, in urlopen
retries = retries.increment(
File "/usr/lib/python3/dist-packages/urllib3/util/retry.py", line 436, in increment
raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='rest-proxy', port=8082): Max retries exceeded with url: /consumers/zabbix_consumer/instances/zabbix_consumer/topics/zabbix (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f0650a9be80>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "consumer.py", line 48, in <module>
get_messages()
File "consumer.py", line 37, in get_messages
r = requests.get(BASE_URI + f'/topics/{NAME_TOPIC}', headers=HEADERS, timeout=30)
File "/usr/lib/python3/dist-packages/requests/api.py", line 75, in get
return request('get', url, params=params, **kwargs)
File "/usr/lib/python3/dist-packages/requests/api.py", line 60, in request
return session.request(method=method, url=url, **kwargs)
File "/usr/lib/python3/dist-packages/requests/sessions.py", line 533, in request
resp = self.send(prep, **send_kwargs)
File "/usr/lib/python3/dist-packages/requests/sessions.py", line 646, in send
r = adapter.send(request, **kwargs)
File "/usr/lib/python3/dist-packages/requests/adapters.py", line 516, in send
raise ConnectionError(e, request=request)
requests.exceptions.ConnectionError: HTTPConnectionPool(host='rest-proxy', port=8082): Max retries exceeded with url: /consumers/zabbix_consumer/instances/zabbix_consumer/topics/zabbix (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f0650a9be80>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution'))
记录休息容器
[2021-05-18 02:07:12,174] INFO 127.0.0.1 - - [18/May/2021:02:07:12 +0000] "DELETE /consumers/zabbix_consumer/instances/zabbix_consumer HTTP/1.1" 404 61 3 (io.confluent.rest-utils.requests)
[2021-05-18 02:07:12,180] INFO ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-zabbix_consumer-9
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = zabbix_consumer
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 30
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
socket.connection.setup.timeout.max.ms = 127000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
(org.apache.kafka.clients.consumer.ConsumerConfig)
[2021-05-18 02:07:12,184] WARN The configuration 'metrics.context.resource.cluster.id' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2021-05-18 02:07:12,184] WARN The configuration 'metrics.context._namespace' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2021-05-18 02:07:12,184] WARN The configuration 'metrics.context.resource.version' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2021-05-18 02:07:12,184] WARN The configuration 'metrics.context.resource.commit.id' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2021-05-18 02:07:12,184] WARN The configuration 'metrics.context.resource.type' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2021-05-18 02:07:12,184] WARN The configuration 'schema.registry.url' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2021-05-18 02:07:12,184] INFO Kafka version: 6.1.1-ce (org.apache.kafka.common.utils.AppInfoParser)
[2021-05-18 02:07:12,184] INFO Kafka commitId: 73deb3aeb1f8647c (org.apache.kafka.common.utils.AppInfoParser)
[2021-05-18 02:07:12,184] INFO Kafka startTimeMs: 1621303632184 (org.apache.kafka.common.utils.AppInfoParser)
[2021-05-18 02:07:12,185] INFO KafkaRestConfig values:
access.control.allow.headers =
access.control.allow.methods =
access.control.allow.origin =
access.control.skip.options = true
advertised.listeners = []
api.endpoints.blocklist = []
api.v2.enable = true
api.v3.enable = true
authentication.method = NONE
authentication.realm =
authentication.roles = [*]
authentication.skip.paths = []
bootstrap.servers = localhost:9092
client.init.timeout.ms = 60000
client.sasl.kerberos.kinit.cmd = /usr/bin/kinit
client.sasl.kerberos.min.time.before.relogin = 60000
client.sasl.kerberos.service.name =
client.sasl.kerberos.ticket.renew.jitter = 0.05
client.sasl.kerberos.ticket.renew.window.factor = 0.8
client.sasl.mechanism = GSSAPI
client.security.protocol = PLAINTEXT
client.ssl.cipher.suites =
client.ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1
client.ssl.endpoint.identification.algorithm =
client.ssl.key.password = [hidden]
client.ssl.keymanager.algorithm = SunX509
client.ssl.keystore.location =
client.ssl.keystore.password = [hidden]
client.ssl.keystore.type = JKS
client.ssl.protocol = TLS
client.ssl.provider =
client.ssl.trustmanager.algorithm = PKIX
client.ssl.truststore.location =
client.ssl.truststore.password = [hidden]
client.ssl.truststore.type = JKS
client.timeout.ms = 500
client.zk.session.timeout.ms = 30000
compression.enable = true
confluent.resource.name.authority =
consumer.instance.timeout.ms = 300000
consumer.iterator.backoff.ms = 50
consumer.iterator.timeout.ms = 1
consumer.request.max.bytes = 67108864
consumer.request.timeout.ms = 1000
consumer.threads = 50
csrf.prevention.enable = false
csrf.prevention.token.endpoint = /csrf
csrf.prevention.token.expiration.minutes = 30
csrf.prevention.token.max.entries = 10000
debug = false
fetch.min.bytes = -1
host.name = rest-proxy
id =
idle.timeout.ms = 30000
kafka.rest.resource.extension.class = []
listeners = [http://localhost:8082]
metric.reporters = []
metrics.jmx.prefix = kafka.rest
metrics.num.samples = 2
metrics.sample.window.ms = 30000
metrics.tag.map = []
port = 8082
producer.threads = 5
request.logger.name = io.confluent.rest-utils.requests
request.queue.capacity = 2147483647
request.queue.capacity.growby = 64
request.queue.capacity.init = 128
resource.extension.classes = []
response.http.headers.config =
response.mediatype.default = application/json
response.mediatype.preferred = [application/json, application/vnd.kafka.v2+json]
rest.servlet.initializor.classes = []
schema.registry.url = http://schema-registry:8081
shutdown.graceful.ms = 1000
simpleconsumer.pool.size.max = 25
simpleconsumer.pool.timeout.ms = 1000
ssl.cipher.suites = []
ssl.client.auth = false
ssl.client.authentication = NONE
ssl.enabled.protocols = []
ssl.endpoint.identification.algorithm = null
ssl.key.password = [hidden]
ssl.keymanager.algorithm =
ssl.keystore.location =
ssl.keystore.password = [hidden]
ssl.keystore.reload = false
ssl.keystore.type = JKS
ssl.keystore.watch.location =
ssl.protocol = TLS
ssl.provider =
ssl.trustmanager.algorithm =
ssl.truststore.location =
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
thread.pool.max = 200
thread.pool.min = 8
websocket.path.prefix = /ws
websocket.servlet.initializor.classes = []
zookeeper.connect =
(io.confluent.kafkarest.KafkaRestConfig)
[2021-05-18 02:07:12,187] INFO 127.0.0.1 - - [18/May/2021:02:07:12 +0000] "POST /consumers/zabbix_consumer HTTP/1.1" 200 100 10 (io.confluent.rest-utils.requests)
docker-撰写
---
version: '2'
services:
zookeeper-1:
image: confluentinc/cp-zookeeper:latest
ports:
- 2181:2181
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
ZOOKEEPER_SERVERS: localhost:22888:23888
network_mode: host
kafka-1:
image: confluentinc/cp-kafka:latest
ports:
- 9092:9092
network_mode: host
depends_on:
- zookeeper-1
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
schema-registry:
image: confluentinc/cp-schema-registry:latest
network_mode: host
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper-1
- kafka-1
ports:
- 8081:8081
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: localhost:2181
SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,OPTIONS'
rest-proxy:
image: confluentinc/cp-kafka-rest:latest
network_mode: host
depends_on:
- zookeeper-1
- kafka-1
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: localhost:9092
KAFKA_REST_LISTENERS: http://localhost:8082
KAFKA_REST_SCHEMA_REGISTRY_URL: http://schema-registry:8081
只需使用 kafka-python 包。
pip install kafka-python
然后像这样订阅它:
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
"your-topic",
bootstrap_servers = "your-kafka-server",
group_id = "your-consumer-group",
auto_offset_reset='earliest',
enable_auto_commit=True,
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
for record in consumer:
data = record.value
请注意,消费者组的成员数量不能超过您的分区数量。否则他们会在没有得到任何数据的情况下挂起。
不用说,value_deserializer
取决于将数据插入主题时如何序列化数据。