没有收到来自 Kafka Topic 的消息
Not receiving messages from Kafka Topic
我在这个程序中调用 poll() 时收到 None 但是当 运行 来自 cmd 的 kafka-console-consumer.bat 时我收到消息,我不能弄清楚到底是什么问题。
执行从main.py
开始
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
import time
import json
from kafka_message_consumer import KafkaMessageConsumer
from kafka_discovery_executor import KafkaDiscoveryExecutor
with open('kafka_properties.json') as f:
kafka_properties = json.loads(f.read())
message_queue = Queue()
kafka_message_consumer = KafkaMessageConsumer(kafka_properties, message_queue)
kafka_discovery_executor = KafkaDiscoveryExecutor(message_queue, kafka_properties)
with ThreadPoolExecutor(max_workers=5) as executor:
executor.submit(kafka_message_consumer.run())
time.sleep(1)
executor.submit(kafka_discovery_executor.run())
time.sleep(1)
KafkaDiscoveryExecutor class 用于使用共享队列中的消息并处理该消息。
这是kafka_message_consumer.py
import logging
from confluent_kafka import Consumer
class KafkaMessageConsumer:
def __init__(self, kafka_properties, message_queue):
self.message_queue = message_queue
self.logger = logging.getLogger('KafkaMessageConsumer')
self.kafka_stream_consumer = None
self.create_consumer(kafka_properties)
def create_consumer(self, kafka_properties):
"""
Create an instance of Kafka Consumer with the consumer configuration properties
and subscribes to the defined topic(s).
"""
consumer_config = dict()
# Consumer configuration properties.
consumer_config['bootstrap.servers'] = kafka_properties.get('bootstrap.servers')
consumer_config['group.id'] = kafka_properties.get('group.id')
consumer_config['enable.auto.commit'] = True
consumer_config['auto.offset.reset'] = 'earliest'
# For SSL Security
# consumer_config['security.protocol'] = 'SASL_SSL'
# consumer_config['sasl.mechanisms'] = 'PLAIN'
# consumer_config['sasl.username'] = ''
# consumer_config['sasl.password'] = ''
# Create the consumer using consumer_config.
self.kafka_stream_consumer = Consumer(consumer_config)
# Subscribe to the specified topic(s).
self.kafka_stream_consumer.subscribe(['mytopic'])
def run(self):
while True:
msg = self.kafka_stream_consumer.poll(1.0)
if msg is None:
# No message available within timeout.
print("Waiting for message or event/error in poll()")
continue
elif msg.error():
print("Error: {}".format(msg.error()))
else:
# Consume the record.
# Push the message into message_queue
try:
self.message_queue.put(msg)
except Exception as e:
self.logger.critical("Error occured in kafka Consumer: {}".format(e))
指定的主题有事件,但我在这里得到 None 并且 'if msg is None:' 中的打印语句正在执行。
我仍然不确定为什么上面的代码不能正常工作。
以下是我为使此代码正常工作所做的更改
- 我使用线程模块而不是 concurrent.futures
- 使用守护线程
- 在 类 [KafkaMessageConsumer, KafkaDiscoveryExecutor]
的构造函数中调用 thread.init()
这里是main.py
from queue import Queue
import threading
import time
import json
from kafka_message_consumer import KafkaMessageConsumer
from kafka_discovery_executor import KafkaDiscoveryExecutor
def main():
with open('kafka_properties.json') as f:
kafka_properties = json.loads(f.read())
message_queue = Queue()
threads = [
KafkaMessageConsumer(kafka_properties, message_queue),
KafkaDiscoveryExecutor(message_queue, kafka_properties)
]
for thread in threads:
thread.start()
time.sleep(1)
for thread in threads:
thread.join()
time.sleep(1)
if __name__ == "__main__":
main()
和kafka_message_consumer.py
import logging
from confluent_kafka import Consumer
import threading
class KafkaMessageConsumer(threading.Thread):
daemon = True
def __init__(self, kafka_properties, message_queue):
threading.Thread.__init__(self)
self.message_queue = message_queue
self.logger = logging.getLogger('KafkaMessageConsumer')
self.kafka_stream_consumer = None
self.create_consumer(kafka_properties)
def create_consumer(self, kafka_properties):
"""
Create an instance of Kafka Consumer with the consumer configuration properties
and subscribes to the defined topic(s).
"""
consumer_config = dict()
# Consumer configuration properties.
consumer_config['bootstrap.servers'] = kafka_properties.get('bootstrap.servers')
consumer_config['group.id'] = kafka_properties.get('group.id')
consumer_config['enable.auto.commit'] = True
consumer_config['auto.offset.reset'] = 'earliest'
# Create the consumer using consumer_config.
self.kafka_stream_consumer = Consumer(consumer_config)
# Subscribe to the specified topic(s).
self.kafka_stream_consumer.subscribe(['mytopic'])
def run(self):
while True:
msg = self.kafka_stream_consumer.poll(1.0)
if msg is None:
# No message available within timeout.
print("Waiting for message or event/error in poll()")
continue
elif msg.error():
print("Error: {}".format(msg.error()))
else:
# Consume the record.
# Push the message into message_queue
try:
self.message_queue.put(msg)
except Exception as e:
self.logger.critical("Error occured in kafka Consumer: {}".format(e))
self.kafka_stream_consumer.close()
我在这个程序中调用 poll() 时收到 None 但是当 运行 来自 cmd 的 kafka-console-consumer.bat 时我收到消息,我不能弄清楚到底是什么问题。
执行从main.py
开始from queue import Queue
from concurrent.futures import ThreadPoolExecutor
import time
import json
from kafka_message_consumer import KafkaMessageConsumer
from kafka_discovery_executor import KafkaDiscoveryExecutor
with open('kafka_properties.json') as f:
kafka_properties = json.loads(f.read())
message_queue = Queue()
kafka_message_consumer = KafkaMessageConsumer(kafka_properties, message_queue)
kafka_discovery_executor = KafkaDiscoveryExecutor(message_queue, kafka_properties)
with ThreadPoolExecutor(max_workers=5) as executor:
executor.submit(kafka_message_consumer.run())
time.sleep(1)
executor.submit(kafka_discovery_executor.run())
time.sleep(1)
KafkaDiscoveryExecutor class 用于使用共享队列中的消息并处理该消息。
这是kafka_message_consumer.py
import logging
from confluent_kafka import Consumer
class KafkaMessageConsumer:
def __init__(self, kafka_properties, message_queue):
self.message_queue = message_queue
self.logger = logging.getLogger('KafkaMessageConsumer')
self.kafka_stream_consumer = None
self.create_consumer(kafka_properties)
def create_consumer(self, kafka_properties):
"""
Create an instance of Kafka Consumer with the consumer configuration properties
and subscribes to the defined topic(s).
"""
consumer_config = dict()
# Consumer configuration properties.
consumer_config['bootstrap.servers'] = kafka_properties.get('bootstrap.servers')
consumer_config['group.id'] = kafka_properties.get('group.id')
consumer_config['enable.auto.commit'] = True
consumer_config['auto.offset.reset'] = 'earliest'
# For SSL Security
# consumer_config['security.protocol'] = 'SASL_SSL'
# consumer_config['sasl.mechanisms'] = 'PLAIN'
# consumer_config['sasl.username'] = ''
# consumer_config['sasl.password'] = ''
# Create the consumer using consumer_config.
self.kafka_stream_consumer = Consumer(consumer_config)
# Subscribe to the specified topic(s).
self.kafka_stream_consumer.subscribe(['mytopic'])
def run(self):
while True:
msg = self.kafka_stream_consumer.poll(1.0)
if msg is None:
# No message available within timeout.
print("Waiting for message or event/error in poll()")
continue
elif msg.error():
print("Error: {}".format(msg.error()))
else:
# Consume the record.
# Push the message into message_queue
try:
self.message_queue.put(msg)
except Exception as e:
self.logger.critical("Error occured in kafka Consumer: {}".format(e))
指定的主题有事件,但我在这里得到 None 并且 'if msg is None:' 中的打印语句正在执行。
我仍然不确定为什么上面的代码不能正常工作。
以下是我为使此代码正常工作所做的更改
- 我使用线程模块而不是 concurrent.futures
- 使用守护线程
- 在 类 [KafkaMessageConsumer, KafkaDiscoveryExecutor] 的构造函数中调用 thread.init()
这里是main.py
from queue import Queue
import threading
import time
import json
from kafka_message_consumer import KafkaMessageConsumer
from kafka_discovery_executor import KafkaDiscoveryExecutor
def main():
with open('kafka_properties.json') as f:
kafka_properties = json.loads(f.read())
message_queue = Queue()
threads = [
KafkaMessageConsumer(kafka_properties, message_queue),
KafkaDiscoveryExecutor(message_queue, kafka_properties)
]
for thread in threads:
thread.start()
time.sleep(1)
for thread in threads:
thread.join()
time.sleep(1)
if __name__ == "__main__":
main()
和kafka_message_consumer.py
import logging
from confluent_kafka import Consumer
import threading
class KafkaMessageConsumer(threading.Thread):
daemon = True
def __init__(self, kafka_properties, message_queue):
threading.Thread.__init__(self)
self.message_queue = message_queue
self.logger = logging.getLogger('KafkaMessageConsumer')
self.kafka_stream_consumer = None
self.create_consumer(kafka_properties)
def create_consumer(self, kafka_properties):
"""
Create an instance of Kafka Consumer with the consumer configuration properties
and subscribes to the defined topic(s).
"""
consumer_config = dict()
# Consumer configuration properties.
consumer_config['bootstrap.servers'] = kafka_properties.get('bootstrap.servers')
consumer_config['group.id'] = kafka_properties.get('group.id')
consumer_config['enable.auto.commit'] = True
consumer_config['auto.offset.reset'] = 'earliest'
# Create the consumer using consumer_config.
self.kafka_stream_consumer = Consumer(consumer_config)
# Subscribe to the specified topic(s).
self.kafka_stream_consumer.subscribe(['mytopic'])
def run(self):
while True:
msg = self.kafka_stream_consumer.poll(1.0)
if msg is None:
# No message available within timeout.
print("Waiting for message or event/error in poll()")
continue
elif msg.error():
print("Error: {}".format(msg.error()))
else:
# Consume the record.
# Push the message into message_queue
try:
self.message_queue.put(msg)
except Exception as e:
self.logger.critical("Error occured in kafka Consumer: {}".format(e))
self.kafka_stream_consumer.close()