如何在 Python 中合并两个 IO 流?
How to merge two IO Streams in Python?
我已经围绕 Spark-Submit 命令创建了一个包装器,以便能够通过解析日志生成实时事件。
目的是创建一个显示 Spark 作业详细进度的实时界面。
因此包装器将如下所示:
submitter = SparkSubmitter()
submitter.submit('/path/to/spark-code.py')
for log_event in submitter:
if log_event:
print('Event:', log_event)
输出如下所示:
Event: StartSparkContextEvent()
Event: StartWorkEvent()
Event: FinishWorkEvent()
Event: StopSparkContextEvent()
在内部,SparkSubmitter class 将 spark-submit 命令作为 subprocess.Popen 进程启动,然后通过解析由过程,像这样:
class SparkSubmitter():
def submit(self, path):
command = self.build_spark_submit_command(path)
self.process = Popen(command, stdout=PIPE, stderr=PIPE)
def __iter__(self):
return self
def __next__(self):
# note: this is a IO-Blocking command
log = self.process.stdout.readline().decode('utf-8')
return self.parse_log_and_return_event(log)
此实现适用于 Spark 独立集群。但是当 运行 在 Yarn Cluster 上时,我遇到了问题。
在 Yarn Cluster 中,"Spark Related Logs" 出现在 stderr
中,而不是 stdout
。所以我的 class 无法解析 spark 生成的日志,因为它只是试图读取 stdout
。
问题 1:是否可以将 Popen 的标准输出和标准错误作为单个流读取?
问题2:stdout和stderr都是Streams,是否可以合并这两个流并作为一个读取?
问题 3: 是否可以将所有日志仅重定向到 stdout?
你所有 3 个问题的答案都是肯定的,你可以使用 stderr=subprocess.STDOUT
作为 Popen
的参数将输出从 stderr
重定向到 stdout
:
self.process = Popen(command, stdout=PIPE, stderr=subprocess.STDOUT)
我已经围绕 Spark-Submit 命令创建了一个包装器,以便能够通过解析日志生成实时事件。 目的是创建一个显示 Spark 作业详细进度的实时界面。
因此包装器将如下所示:
submitter = SparkSubmitter()
submitter.submit('/path/to/spark-code.py')
for log_event in submitter:
if log_event:
print('Event:', log_event)
输出如下所示:
Event: StartSparkContextEvent()
Event: StartWorkEvent()
Event: FinishWorkEvent()
Event: StopSparkContextEvent()
在内部,SparkSubmitter class 将 spark-submit 命令作为 subprocess.Popen 进程启动,然后通过解析由过程,像这样:
class SparkSubmitter():
def submit(self, path):
command = self.build_spark_submit_command(path)
self.process = Popen(command, stdout=PIPE, stderr=PIPE)
def __iter__(self):
return self
def __next__(self):
# note: this is a IO-Blocking command
log = self.process.stdout.readline().decode('utf-8')
return self.parse_log_and_return_event(log)
此实现适用于 Spark 独立集群。但是当 运行 在 Yarn Cluster 上时,我遇到了问题。
在 Yarn Cluster 中,"Spark Related Logs" 出现在 stderr
中,而不是 stdout
。所以我的 class 无法解析 spark 生成的日志,因为它只是试图读取 stdout
。
问题 1:是否可以将 Popen 的标准输出和标准错误作为单个流读取?
问题2:stdout和stderr都是Streams,是否可以合并这两个流并作为一个读取?
问题 3: 是否可以将所有日志仅重定向到 stdout?
你所有 3 个问题的答案都是肯定的,你可以使用 stderr=subprocess.STDOUT
作为 Popen
的参数将输出从 stderr
重定向到 stdout
:
self.process = Popen(command, stdout=PIPE, stderr=subprocess.STDOUT)