使用 Kafka 在应用程序上打开太多文件错误
Too many files open error on app using Kafka
我正在使用 Kafka 和 Spark Streaming 构建一个应用程序。输入数据来自第三方流,并发布在 kafka 主题上。此代码显示了 Stream Proxy 模块:这是我从流式传输中获取结果以及将它们发送到 KafkaPublisher 的方式(仅显示了一个草图):
def on_result_response(self,*args):
self.kafkaPublisher.pushMessage(str(args[0]))
KafkaPublisher是通过这两种方式实现的:
class KafkaPublisher:
def __init__(self,address,port,topic):
self.kafka = KafkaClient(str(address)+":"+str(port))
self.producer = SimpleProducer(self.kafka)
self.topic=topic
def pushMessage(self,message):
self.producer.send_messages(self.topic, message)
self.producer = SimpleProducer(self.kafka, async=True)
并且该应用程序由此主程序启动:
from StreamProxy import StreamProxy
streamProxy=StreamProxy("localhost",9092,"task1")
streamProxy.getStreaming(20) #seconds of streaming
经过一些批处理(或多或少 10 秒)后,它启动了以下异常:
Exception in thread Thread-2354:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
File "/usr/lib/python2.7/threading.py", line 754, in run
File "/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py", line 164, in _send_upstream
File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 649, in send_produce_request
File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 253, in _send_broker_aware_request
File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 74, in _get_conn
File "/usr/local/lib/python2.7/dist-packages/kafka/conn.py", line 236, in connect
error: [Errno 24] Too many open files
Exception in thread Thread-2355:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
File "/usr/lib/python2.7/threading.py", line 754, in run
File "/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py", line 164, in _send_upstream
File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 649, in send_produce_request
File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 253, in _send_broker_aware_request
File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 74, in _get_conn
File "/usr/local/lib/python2.7/dist-packages/kafka/conn.py", line 236, in connect
error: [Errno 24] Too many open files
请注意,同一条消息有许多不同的异常,问题肯定出在发布商方面。
尝试删除行:
self.producer = SimpleProducer(self.kafka, async=True)
我正在使用 Kafka 和 Spark Streaming 构建一个应用程序。输入数据来自第三方流,并发布在 kafka 主题上。此代码显示了 Stream Proxy 模块:这是我从流式传输中获取结果以及将它们发送到 KafkaPublisher 的方式(仅显示了一个草图):
def on_result_response(self,*args):
self.kafkaPublisher.pushMessage(str(args[0]))
KafkaPublisher是通过这两种方式实现的:
class KafkaPublisher:
def __init__(self,address,port,topic):
self.kafka = KafkaClient(str(address)+":"+str(port))
self.producer = SimpleProducer(self.kafka)
self.topic=topic
def pushMessage(self,message):
self.producer.send_messages(self.topic, message)
self.producer = SimpleProducer(self.kafka, async=True)
并且该应用程序由此主程序启动:
from StreamProxy import StreamProxy
streamProxy=StreamProxy("localhost",9092,"task1")
streamProxy.getStreaming(20) #seconds of streaming
经过一些批处理(或多或少 10 秒)后,它启动了以下异常:
Exception in thread Thread-2354: Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner File "/usr/lib/python2.7/threading.py", line 754, in run File "/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py", line 164, in _send_upstream File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 649, in send_produce_request File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 253, in _send_broker_aware_request File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 74, in _get_conn File "/usr/local/lib/python2.7/dist-packages/kafka/conn.py", line 236, in connect error: [Errno 24] Too many open files
Exception in thread Thread-2355: Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner File "/usr/lib/python2.7/threading.py", line 754, in run File "/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py", line 164, in _send_upstream File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 649, in send_produce_request File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 253, in _send_broker_aware_request File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 74, in _get_conn File "/usr/local/lib/python2.7/dist-packages/kafka/conn.py", line 236, in connect error: [Errno 24] Too many open files
请注意,同一条消息有许多不同的异常,问题肯定出在发布商方面。
尝试删除行:
self.producer = SimpleProducer(self.kafka, async=True)