如何将 Python 消费者连接到 AWS MSK
How to connect Python consumer to AWS MSK
我正在尝试将我的 python 消费者连接到 AWS MSK 集群。我该怎么做?
拥有 AWS MSK 集群运行
我正在尝试使用 python 和 kafka python.
使用来自 MSK 集群的消息
我收到错误
Traceback (most recent call last):
File "consumer.py", line 23, in <module>
for message in consumer:
File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 1193, in __next__
return self.next_v2()
File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 1201, in next_v2
return next(self._iterator)
File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 1116, in _message_generator_v2
record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 655, in poll
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 675, in _poll_once
self._coordinator.poll()
File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/coordinator/consumer.py", line 270, in poll
self.ensure_coordinator_ready()
File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/coordinator/base.py", line 258, in ensure_coordinator_ready
self._client.poll(future=future)
File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/client_async.py", line 582, in poll
self._maybe_connect(node_id)
File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/client_async.py", line 392, in _maybe_connect
conn.connect()
File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/conn.py", line 429, in connect
if self._try_handshake():
File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/conn.py", line 508, in _try_handshake
self._sock.do_handshake()
File "/usr/lib/python3.6/ssl.py", line 1077, in do_handshake
self._sslobj.do_handshake()
File "/usr/lib/python3.6/ssl.py", line 689, in do_handshake
self._sslobj.do_handshake()
OSError: [Errno 0] Error
使用kafka-python:
from kafka import KafkaConsumer
if __name__ == '__main__':
topic_name = 'example-topic'
consumer = KafkaConsumer(topic_name, auto_offset_reset='earliest',
bootstrap_servers=['kafka2:9092'], api_version=(0, 10), consumer_timeout_ms=1000)
for msg in consumer:
print(msg.value)
if consumer is not None:
consumer.close()
from time import sleep
from kafka import KafkaProducer
# publish messages on topic
def publish_message(producer_instance, topic_name, key, value):
try:
key_bytes = bytes(key, encoding='utf-8')
value_bytes = bytes(value, encoding='utf-8')
producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
producer_instance.flush()
print('Message ' + key + ' published successfully.')
except Exception as ex:
print('Exception in publishing message')
print(str(ex))
# establish kafka connection
def connect_kafka_producer():
_producer = None
try:
_producer = KafkaProducer(bootstrap_servers=['kafka1:9092'])
except Exception as ex:
print('Exception while connecting Kafka')
print(str(ex))
finally:
return _producer
if __name__ == '__main__':
kafka_producer = connect_kafka_producer()
x = 0
while True:
publish_message(kafka_producer, 'raw_recipes', str(x), 'This is message ' + str(x))
x += 1
if kafka_producer is not None:
kafka_producer.close()
我正在尝试将我的 python 消费者连接到 AWS MSK 集群。我该怎么做?
拥有 AWS MSK 集群运行 我正在尝试使用 python 和 kafka python.
使用来自 MSK 集群的消息我收到错误
Traceback (most recent call last):
File "consumer.py", line 23, in <module>
for message in consumer:
File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 1193, in __next__
return self.next_v2()
File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 1201, in next_v2
return next(self._iterator)
File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 1116, in _message_generator_v2
record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 655, in poll
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 675, in _poll_once
self._coordinator.poll()
File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/coordinator/consumer.py", line 270, in poll
self.ensure_coordinator_ready()
File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/coordinator/base.py", line 258, in ensure_coordinator_ready
self._client.poll(future=future)
File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/client_async.py", line 582, in poll
self._maybe_connect(node_id)
File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/client_async.py", line 392, in _maybe_connect
conn.connect()
File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/conn.py", line 429, in connect
if self._try_handshake():
File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/conn.py", line 508, in _try_handshake
self._sock.do_handshake()
File "/usr/lib/python3.6/ssl.py", line 1077, in do_handshake
self._sslobj.do_handshake()
File "/usr/lib/python3.6/ssl.py", line 689, in do_handshake
self._sslobj.do_handshake()
OSError: [Errno 0] Error
使用kafka-python:
from kafka import KafkaConsumer
if __name__ == '__main__':
topic_name = 'example-topic'
consumer = KafkaConsumer(topic_name, auto_offset_reset='earliest',
bootstrap_servers=['kafka2:9092'], api_version=(0, 10), consumer_timeout_ms=1000)
for msg in consumer:
print(msg.value)
if consumer is not None:
consumer.close()
from time import sleep
from kafka import KafkaProducer
# publish messages on topic
def publish_message(producer_instance, topic_name, key, value):
try:
key_bytes = bytes(key, encoding='utf-8')
value_bytes = bytes(value, encoding='utf-8')
producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
producer_instance.flush()
print('Message ' + key + ' published successfully.')
except Exception as ex:
print('Exception in publishing message')
print(str(ex))
# establish kafka connection
def connect_kafka_producer():
_producer = None
try:
_producer = KafkaProducer(bootstrap_servers=['kafka1:9092'])
except Exception as ex:
print('Exception while connecting Kafka')
print(str(ex))
finally:
return _producer
if __name__ == '__main__':
kafka_producer = connect_kafka_producer()
x = 0
while True:
publish_message(kafka_producer, 'raw_recipes', str(x), 'This is message ' + str(x))
x += 1
if kafka_producer is not None:
kafka_producer.close()