如何在 Twisted [autobahn] websocket 服务器中实时流式传输输出?
How to stream output in realtime in Twisted[autobahn] websocket server?
我想使用 subprocess.Popen() 执行一个 C 程序并实时流式传输它的输出并将其发送到客户端。但是,输出会被缓冲并在执行结束时一起发送(阻塞性质)。我怎样才能实时接收输出,然后立即在 Twisted Autobahn 中发送它。
def onConnect(self, request):
try:
self.cont_name = ''.join(random.choice(string.lowercase) for i in range(5))
self.file_name = self.cont_name
print("Connecting...")
except Exception:
print("Failed"+str(Exception))
def onOpen(self):
try:
print("open")
except Exception:
print("Couldn't create container")
def onMessage(self, payload,isBinary=False):
cmd = "docker exec "+self.cont_name+" /tmp/./"+self.file_name
a = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, bufsize=1)
for line in iter(a.stdout.readline, b''):
line = line.encode('utf8')
self.sendMessage(line)
def onClose(self, wasClean, code, reason):
try:
print("Closed container...")
except Exception:
print(str(Exception))
当使用子进程执行 docker 命令时,c 代码的整个输出是立即 returned 而不是碰巧。例如:
#include <stdio.h>
#include <unistd.h>
int main(){
int i=0;
for(i=0;i<5;i++){
fflush(stdout);
printf("Rounded\n");
sleep(3);
}
}
在容器中 运行 之后,程序应该在 3 秒后 return 'Rounded' 到客户端。但是,它最终会在执行结束时发送所有 'Rounded'。
错误行为来自此方法中的循环:
def onMessage(self, payload,isBinary=False):
cmd = "docker exec "+self.cont_name+" /tmp/./"+self.file_name
a = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, bufsize=1)
for line in iter(a.stdout.readline, b''):
line = line.encode('utf8')
self.sendMessage(line)
Twisted 是一个协作式多任务处理系统。默认情况下,所有 运行 都在一个线程中 ("the reactor thread")。这意味着所有代码都必须周期性地(通常很快)放弃控制权,以便其他代码(应用程序代码或 Twisted 实现代码)有机会 运行。此函数中的循环从子进程读取并尝试使用 Autobahn API 发送数据 - 一遍又一遍,永不放弃控制。
阻止从 Popen 对象读取也可能会导致问题。您不会知道读取将阻塞多长时间,因此您不会知道在反应器线程中阻止其他代码 运行ning 多长时间。您可以将 Popen 读取移动到一个新线程,这样它们就不会阻塞反应器线程:
def onMessage(self, payload,isBinary=False):
cmd = "docker exec "+self.cont_name+" /tmp/./"+self.file_name
popen_in_thread(
lambda line: reactor.callFromThread(
lambda: self.sendMessage(line.encode("utf-8"))
),
[cmd], shell=True, stdout=subprocess.PIPE, bufsize=1
)
def popen_in_thread(callback, *args, **kwargs):
def threaded():
a = subprocess.Popen(*args, **kwargs)
for line in iter(a.stdout.readline, b''):
callback(line)
reactor.callInThread(threaded)
或者,更好的是,使用 Twisted 自己的进程支持:
def onMessage(self, payload,isBinary=False):
class ProcessLinesToMessages(ProcessProtocol):
def outReceived(self, output):
buf = self.buf + output
lines = buf.splitlines()
self.buf = lines.pop()
for line in lines:
self.sendMessage(line.encode("utf-8"))
while True:
line, self.buf = self.buf.splitline
reactor.spawnProcess(
ProcessLinesToMessages(),
"docker",
[
"docker",
"exec",
self.cont_name,
"/tmp/./ + self.file_name,
],
)
(两个版本都没有测试,但希望思路清晰)
我想使用 subprocess.Popen() 执行一个 C 程序并实时流式传输它的输出并将其发送到客户端。但是,输出会被缓冲并在执行结束时一起发送(阻塞性质)。我怎样才能实时接收输出,然后立即在 Twisted Autobahn 中发送它。
def onConnect(self, request):
try:
self.cont_name = ''.join(random.choice(string.lowercase) for i in range(5))
self.file_name = self.cont_name
print("Connecting...")
except Exception:
print("Failed"+str(Exception))
def onOpen(self):
try:
print("open")
except Exception:
print("Couldn't create container")
def onMessage(self, payload,isBinary=False):
cmd = "docker exec "+self.cont_name+" /tmp/./"+self.file_name
a = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, bufsize=1)
for line in iter(a.stdout.readline, b''):
line = line.encode('utf8')
self.sendMessage(line)
def onClose(self, wasClean, code, reason):
try:
print("Closed container...")
except Exception:
print(str(Exception))
当使用子进程执行 docker 命令时,c 代码的整个输出是立即 returned 而不是碰巧。例如:
#include <stdio.h>
#include <unistd.h>
int main(){
int i=0;
for(i=0;i<5;i++){
fflush(stdout);
printf("Rounded\n");
sleep(3);
}
}
在容器中 运行 之后,程序应该在 3 秒后 return 'Rounded' 到客户端。但是,它最终会在执行结束时发送所有 'Rounded'。
错误行为来自此方法中的循环:
def onMessage(self, payload,isBinary=False):
cmd = "docker exec "+self.cont_name+" /tmp/./"+self.file_name
a = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, bufsize=1)
for line in iter(a.stdout.readline, b''):
line = line.encode('utf8')
self.sendMessage(line)
Twisted 是一个协作式多任务处理系统。默认情况下,所有 运行 都在一个线程中 ("the reactor thread")。这意味着所有代码都必须周期性地(通常很快)放弃控制权,以便其他代码(应用程序代码或 Twisted 实现代码)有机会 运行。此函数中的循环从子进程读取并尝试使用 Autobahn API 发送数据 - 一遍又一遍,永不放弃控制。
阻止从 Popen 对象读取也可能会导致问题。您不会知道读取将阻塞多长时间,因此您不会知道在反应器线程中阻止其他代码 运行ning 多长时间。您可以将 Popen 读取移动到一个新线程,这样它们就不会阻塞反应器线程:
def onMessage(self, payload,isBinary=False):
cmd = "docker exec "+self.cont_name+" /tmp/./"+self.file_name
popen_in_thread(
lambda line: reactor.callFromThread(
lambda: self.sendMessage(line.encode("utf-8"))
),
[cmd], shell=True, stdout=subprocess.PIPE, bufsize=1
)
def popen_in_thread(callback, *args, **kwargs):
def threaded():
a = subprocess.Popen(*args, **kwargs)
for line in iter(a.stdout.readline, b''):
callback(line)
reactor.callInThread(threaded)
或者,更好的是,使用 Twisted 自己的进程支持:
def onMessage(self, payload,isBinary=False):
class ProcessLinesToMessages(ProcessProtocol):
def outReceived(self, output):
buf = self.buf + output
lines = buf.splitlines()
self.buf = lines.pop()
for line in lines:
self.sendMessage(line.encode("utf-8"))
while True:
line, self.buf = self.buf.splitline
reactor.spawnProcess(
ProcessLinesToMessages(),
"docker",
[
"docker",
"exec",
self.cont_name,
"/tmp/./ + self.file_name,
],
)
(两个版本都没有测试,但希望思路清晰)