Python 具有实时输入和多个控制台的子进程

Python subprocess with real-time input and multiple consoles

主要问题

简而言之:我的程序需要两个控制台。一个用于活跃的用户输入。另一个用于纯日志输出。 (包括已接受答案的工作代码在下面的问题文本中,在 "Edit-3" 部分下。在 "Edit-1" 部分下和"Edit-2" 部分是有效的解决方法。)

为此,我有一个主命令行 Python 脚本,它应该打开一个额外的控制台,仅用于日志输出。为此,我打算将打印在主脚本控制台上的日志输出重定向到第二个控制台的标准输入,我将其作为子进程启动。 (我使用子进程,因为我没有找到打开第二个控制台的任何其他方法。)

问题是,我似乎能够发送到第二个控制台的标准输入 - 但是,第二个控制台上没有打印任何内容。

以下是我用于实验的代码(PyDev 上的 Python 3.4 在 Windows 10 下)。函数 writing(input, pipe, process) 包含部分,其中生成的字符串被复制到作为 pipe 传递的标准输入,通过子进程打开的控制台。函数 writing(...) 是 运行 通过 class writetest(Thread)。 (我留下了一些代码,我把它注释掉了。)

import os
import sys
import io
import time
import threading
from cmd import Cmd
from queue import Queue
from subprocess import Popen, PIPE, CREATE_NEW_CONSOLE


REPETITIONS = 3


# Position of "The class" (Edit-2)


# Position of "The class" (Edit-1)


class generatetest(threading.Thread):

    def __init__(self, queue):
        self.output = queue
        threading.Thread.__init__(self)

    def run(self):
        print('run generatetest')
        generating(REPETITIONS, self.output)
        print('generatetest done')

    def getout(self):
        return self.output


class writetest(threading.Thread):

    def __init__(self, input=None, pipe=None, process=None):
        if (input == None):        # just in case
            self.input = Queue()
        else:
            self.input = input

        if (pipe == None):        # just in case
            self.pipe = PIPE
        else:
            self.pipe = pipe

        if (process == None):        # just in case
            self.process = subprocess.Popen('C:\Windows\System32\cmd.exe', universal_newlines=True, creationflags=CREATE_NEW_CONSOLE)
        else:
            self.process = proc

        threading.Thread.__init__(self)

    def run(self):
        print('run writetest')
        writing(self.input, self.pipe, self.process)
        print('writetest done')


# Position of "The function" (Edit-2)


# Position of "The function" (Edit-1)


def generating(maxint, outline):
    print('def generating')
    for i in range(maxint):
        time.sleep(1)
        outline.put_nowait(i)


def writing(input, pipe, process):
    print('def writing')
    while(True):
        try:
            print('try')
            string = str(input.get(True, REPETITIONS)) + "\n"
            pipe = io.StringIO(string)
            pipe.flush()
            time.sleep(1)
            # print(pipe.readline())
        except:
            print('except')
            break
        finally:
            print('finally')
            pass


data_queue = Queue()
data_pipe = sys.stdin
# printer = sys.stdout
# data_pipe = os.pipe()[1]


# The code of 'C:\Users\Public\Documents\test\test-cmd.py'
# can be found in the question's text further below under "More code"


exe = 'C:\Python34\python.exe'
# exe = 'C:\Windows\System32\cmd.exe'
arg = 'C:\Users\Public\Documents\test\test-cmd.py'
arguments = [exe, arg]
# proc = Popen(arguments, universal_newlines=True, creationflags=CREATE_NEW_CONSOLE)
proc = Popen(arguments, stdin=data_pipe, stdout=PIPE, stderr=PIPE,
             universal_newlines=True, creationflags=CREATE_NEW_CONSOLE)


# Position of "The call" (Edit-2 & Edit-1) - file init (proxyfile)


# Position of "The call" (Edit-2) - thread = sockettest()
# Position of "The call" (Edit-1) - thread0 = logtest()
thread1 = generatetest(data_queue)
thread2 = writetest(data_queue, data_pipe, proc)
# time.sleep(5)


# Position of "The call" (Edit-2) - thread.start()
# Position of "The call" (Edit-1) - thread0.start()
thread1.start()
thread2.start()


# Position of "The call" (Edit-2) - thread.join()
# Position of "The call" (Edit-1) - thread.join()
thread1.join(REPETITIONS * REPETITIONS)
thread2.join(REPETITIONS * REPETITIONS)

# data_queue.join()
# receiver = proc.communicate(stdin, 5)
# print('OUT:' + receiver[0])
# print('ERR:' + receiver[1])

print("1st part finished")

略有不同的方法

以下附加代码片段适用于从子进程中提取标准输出。但是,先前发送的标准输入仍未在第二个控制台上打印。另外,第二个控制台立即关闭。

proc2 = Popen(['C:\Python34\python.exe', '-i'],
              stdin=PIPE,
              stdout=PIPE,
              stderr=PIPE,
              creationflags=CREATE_NEW_CONSOLE)
proc2.stdin.write(b'2+2\n')
proc2.stdin.flush()
print(proc2.stdout.readline())
proc2.stdin.write(b'len("foobar")\n')
proc2.stdin.flush()
print(proc2.stdout.readline())
time.sleep(1)
proc2.stdin.close()
proc2.terminate()
proc2.wait(timeout=0.2)

print("Exiting Main Thread")

更多信息

一旦我使用其中一个参数 stdin=data_pipe, stdout=PIPE, stderr=PIPE 启动子进程,生成的第二个控制台就不会激活并且不接受键盘输入(这不是我们想要的,但可能会有帮助此处提供信息)。

子进程方法 communicate() 不能用于此,因为它等待进程结束。


更多代码

最后是第二个控制台的文件代码。

C:\Users\Public\Documents\test\test-cmd.py

from cmd import Cmd
from time import sleep
from datetime import datetime

INTRO = 'command line'
PROMPT = '> '


class CommandLine(Cmd):
    """Custom console"""

    def __init__(self, intro=INTRO, prompt=PROMPT):
        Cmd.__init__(self)
        self.intro = intro
        self.prompt = prompt
        self.doc_header = intro
        self.running = False

    def do_dummy(self, args):
        """Runs a dummy method."""
        print("Do the dummy.")
        self.running = True
        while(self.running == True):
            print(datetime.now())
            sleep(5)

    def do_stop(self, args):
        """Stops the dummy method."""
        print("Stop the dummy, if you can.")
        self.running = False

    def do_exit(self, args):
        """Exits this console."""
        print("Do console exit.")
        exit()

if __name__ == '__main__':
    cl = CommandLine()
    cl.prompt = PROMPT
    cl.cmdloop(INTRO)

想法

到目前为止,我什至不确定 Windows 命令行界面是否能够接受键盘以外的其他输入(而不是所需的标准输入管道或类似输入)。不过,我希望它具有某种被动模式。

为什么这不起作用?


Edit-1:通过文件解决方法(概念验证)

按照 Working multiple consoles in python 的回答中的建议,使用文件作为解决方法以显示其新内容通常是可行的。但是,由于日志文件会增长到许多 GB,因此在这种情况下这不是一个实用的解决方案。它至少需要文件拆分和正确处理它。

class:

class logtest(threading.Thread):

    def __init__(self, file):
        self.file = file
        threading.Thread.__init__(self)

    def run(self):
        print('run logtest')
        logging(self.file)
        print('logtest done')

函数:

def logging(file):
    pexe = 'C:\Python34\python.exe '
    script = 'C:\Users\Public\Documents\test\test-004.py'
    filek = '--file'
    filev = file

    file = open(file, 'a')
    file.close()
    time.sleep(1)

    print('LOG START (outer): ' + script + ' ' + filek + ' ' + filev)
    proc = Popen([pexe, script, filek, filev], universal_newlines=True, creationflags=CREATE_NEW_CONSOLE)
    print('LOG FINISH (outer): ' + script + ' ' + filek + ' ' + filev)

    time.sleep(2)

来电:

# The file tempdata is filled with several strings of "0\n1\n2\n"
# Looking like this:
# 0
# 1
# 2
# 0
# 1
# 2

proxyfile = 'C:\Users\Public\Documents\test\tempdata'
f = open(proxyfile, 'a')
f.close()
time.sleep(1)

thread0 = logtest(proxyfile)
thread0.start()
thread0.join(REPETITIONS * REPETITIONS)

尾部脚本("test-004.py"):

由于 Windows 不提供 tail 命令,我改用了以下脚本(基于 How to implement a pythonic equivalent of tail -F? 的答案),它适用于此。额外但有点不必要的 class CommandLine(Cmd) 最初是试图让第二个控制台保持打开状态(因为缺少脚本文件参数)。不过,它也证明了自己对于保持控制台流畅地打印新日志文件内容很有用。否则输出不是 deterministic/predictable.

import time
import sys
import os
import threading
from cmd import Cmd
from argparse import ArgumentParser


def main(args):
    parser = ArgumentParser(description="Parse arguments.")
    parser.add_argument("-f", "--file", type=str, default='', required=False)
    arguments = parser.parse_args(args)

    if not arguments.file:
        print('LOG PRE-START (inner): file argument not found. Creating new default entry.')
        arguments.file = 'C:\Users\Public\Documents\test\tempdata'

    print('LOG START (inner): ' + os.path.abspath(os.path.dirname(__file__)) + ' ' + arguments.file)

    f = open(arguments.file, 'a')
    f.close()
    time.sleep(1)

    words = ['word']
    console = CommandLine(arguments.file, words)
    console.prompt = ''

    thread = threading.Thread(target=console.cmdloop, args=('', ))
    thread.start()
    print("\n")

    for hit_word, hit_sentence in console.watch():
        print("Found %r in line: %r" % (hit_word, hit_sentence))

    print('LOG FINISH (inner): ' + os.path.abspath(os.path.dirname(__file__)) + ' ' + arguments.file)


class CommandLine(Cmd):
    """Custom console"""

    def __init__(self, fn, words):
        Cmd.__init__(self)
        self.fn = fn
        self.words = words

    def watch(self):
        fp = open(self.fn, 'r')
        while True:
            time.sleep(0.05)
            new = fp.readline()
            print(new)
            # Once all lines are read this just returns ''
            # until the file changes and a new line appears

            if new:
                for word in self.words:
                    if word in new:
                        yield (word, new)

            else:
                time.sleep(0.5)


if __name__ == '__main__':
    print('LOG START (inner - as main).')
    main(sys.argv[1:])

Edit-1:更多想法

我还没有尝试过但可能有用的三个变通方法是套接字(也在这个答案 Working multiple consoles in python 中建议),通过进程 ID 获取进程对象以获得更多控制,以及使用 ctypes 库用于直接访问 Windows 控制台 API,允许设置屏幕缓冲区,因为控制台可以有多个缓冲区,但只有一个活动缓冲区(在 [=126= 文档的备注中说明) ]CreateConsoleScreenBuffer 函数).

但是,使用套接字可能是最简单的一种。这样至少日志的大小无关紧要。不过,连接问题可能是这里的问题。


Edit-2:通过套接字解决方法(概念证明)

使用套接字作为解决方法来显示新的日志实体,正如 Working multiple consoles in python 的回答中所建议的那样,通常也可以正常工作。不过,这似乎太费力了,应该简单地发送到接收控制台的进程。

class:

class sockettest(threading.Thread):

    def __init__(self, host, port, file):
        self.host = host
        self.port = port
        self.file = file
        threading.Thread.__init__(self)

    def run(self):
        print('run sockettest')
        socketing(self.host, self.port, self.file)
        print('sockettest done')

函数:

def socketing(host, port, file):
    pexe = 'C:\Python34\python.exe '
    script = 'C:\Users\Public\Documents\test\test-005.py'
    hostk = '--address'
    hostv = str(host)
    portk = '--port'
    portv = str(port)
    filek = '--file'
    filev = file

    file = open(file, 'a')
    file.close()
    time.sleep(1)

    print('HOST START (outer): ' + pexe + script + ' ' + hostk + ' ' + hostv + ' ' + portk + ' ' + portv + ' ' + filek + ' ' + filev)
    proc = Popen([pexe, script, hostk, hostv, portk, portv, filek, filev], universal_newlines=True, creationflags=CREATE_NEW_CONSOLE)

    print('HOST FINISH (outer): ' + pexe + script + ' ' + hostk + ' ' + hostv + ' ' + portk + ' ' + portv + ' ' + filek + ' ' + filev)

    time.sleep(2)

来电:

# The file tempdata is filled with several strings of "0\n1\n2\n"
# Looking like this:
# 0
# 1
# 2
# 0
# 1
# 2

proxyfile = 'C:\Users\Public\Documents\test\tempdata'
f = open(proxyfile, 'a')
f.close()
time.sleep(1)

thread = sockettest('127.0.0.1', 8888, proxyfile)
thread.start()
thread.join(REPETITIONS * REPETITIONS)

套接字脚本("test-005.py"):

以下脚本基于Python: Socket programming server-client application using threads。在这里,我只是将 class CommandLine(Cmd) 作为日志条目生成器。在这一点上,将客户端放入调用第二个控制台的主脚本应该不是问题,然后将真实的日志实体而不是(新的)文件行提供给队列。 (服务器是打印机。)

import socket
import sys
import threading
import time
from cmd import Cmd
from argparse import ArgumentParser
from queue import Queue

BUFFER_SIZE = 5120

class CommandLine(Cmd):
    """Custom console"""

    def __init__(self, fn, words, queue):
        Cmd.__init__(self)
        self.fn = fn
        self.words = words
        self.queue = queue

    def watch(self):
        fp = open(self.fn, 'r')
        while True:
            time.sleep(0.05)
            new = fp.readline()

            # Once all lines are read this just returns ''
            # until the file changes and a new line appears
            self.queue.put_nowait(new)


def main(args):
    parser = ArgumentParser(description="Parse arguments.")
    parser.add_argument("-a", "--address", type=str, default='127.0.0.1', required=False)
    parser.add_argument("-p", "--port", type=str, default='8888', required=False)
    parser.add_argument("-f", "--file", type=str, default='', required=False)
    arguments = parser.parse_args(args)

    if not arguments.address:
        print('HOST PRE-START (inner): host argument not found. Creating new default entry.')
        arguments.host = '127.0.0.1'
    if not arguments.port:
        print('HOST PRE-START (inner): port argument not found. Creating new default entry.')
        arguments.port = '8888'
    if not arguments.file:
        print('HOST PRE-START (inner): file argument not found. Creating new default entry.')
        arguments.file = 'C:\Users\Public\Documents\test\tempdata'

    file_queue = Queue()

    print('HOST START (inner): ' + ' ' + arguments.address + ':' + arguments.port + ' --file ' + arguments.file)

    # Start server
    thread = threading.Thread(target=start_server, args=(arguments.address, arguments.port, ))
    thread.start()
    time.sleep(1)

    # Start client
    thread = threading.Thread(target=start_client, args=(arguments.address, arguments.port, file_queue, ))
    thread.start()

    # Start file reader
    f = open(arguments.file, 'a')
    f.close()
    time.sleep(1)

    words = ['word']
    console = CommandLine(arguments.file, words, file_queue)
    console.prompt = ''

    thread = threading.Thread(target=console.cmdloop, args=('', ))
    thread.start()
    print("\n")

    for hit_word, hit_sentence in console.watch():
        print("Found %r in line: %r" % (hit_word, hit_sentence))

    print('HOST FINISH (inner): ' + ' ' + arguments.address + ':' + arguments.port)


def start_client(host, port, queue):
    host = host
    port = int(port)         # arbitrary non-privileged port
    queue = queue

    soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    try:
        soc.connect((host, port))
    except:
        print("Client connection error" + str(sys.exc_info()))
        sys.exit()

    print("Enter 'quit' to exit")
    message = ""

    while message != 'quit':
        time.sleep(0.05)
        if(message != ""):
            soc.sendall(message.encode("utf8"))
            if soc.recv(BUFFER_SIZE).decode("utf8") == "-":
                pass        # null operation

        string = ""
        if (not queue.empty()):
            string = str(queue.get_nowait()) + "\n"

        if(string == None or string == ""):
            message = ""
        else:
            message = string

    soc.send(b'--quit--')


def start_server(host, port):
    host = host
    port = int(port)         # arbitrary non-privileged port

    soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # SO_REUSEADDR flag tells the kernel to reuse a local socket in TIME_WAIT state, without waiting for its natural timeout to expire
    soc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    print("Socket created")

    try:
        soc.bind((host, port))
    except:
        print("Bind failed. Error : " + str(sys.exc_info()))
        sys.exit()

    soc.listen(5)       # queue up to 5 requests
    print("Socket now listening")

    # infinite loop- do not reset for every requests
    while True:
        connection, address = soc.accept()
        ip, port = str(address[0]), str(address[1])
        print("Connected with " + ip + ":" + port)

        try:
            threading.Thread(target=client_thread, args=(connection, ip, port)).start()
        except:
            print("Thread did not start.")
            traceback.print_exc()

    soc.close()


def client_thread(connection, ip, port, max_buffer_size=BUFFER_SIZE):
    is_active = True

    while is_active:
        client_input = receive_input(connection, max_buffer_size)

        if "--QUIT--" in client_input:
            print("Client is requesting to quit")
            connection.close()
            print("Connection " + ip + ":" + port + " closed")
            is_active = False
        elif not client_input == "":
            print("{}".format(client_input))
            connection.sendall("-".encode("utf8"))
        else:
            connection.sendall("-".encode("utf8"))


def receive_input(connection, max_buffer_size):
    client_input = connection.recv(max_buffer_size)
    client_input_size = sys.getsizeof(client_input)

    if client_input_size > max_buffer_size:
        print("The input size is greater than expected {}".format(client_input_size))

    decoded_input = client_input.decode("utf8").rstrip()  # decode and strip end of line
    result = process_input(decoded_input)

    return result


def process_input(input_str):
    return str(input_str).upper()


if __name__ == '__main__':
    print('HOST START (inner - as main).')
    main(sys.argv[1:])

Edit-2:进一步的想法

直接控制子进程的控制台输入 pipe/buffer 将是解决此问题的最佳方案。因为这是500声望的赏金。

很遗憾,我 运行没时间了。因此,我现在可能会使用其中一种解决方法,稍后再用适当的解决方案替换它们。或者我可能必须使用核选项,只有一个控制台,在任何用户键盘输入期间暂停正在进行的日志输出,然后打印。当然这可能会导致缓冲区问题,当用户决定只输入一半时。


Edit-3:包含已接受答案的代码(一个文件)

根据 James Kent 的回答,当我通过 Windows 命令行 (cmd) 或 PowerShell 使用代码启动脚本时,我得到了所需的行为。但是,当我通过 Eclipse/PyDev 和 "Python run" 启动相同的脚本时,输出始终打印在主 Eclipse/PyDev 控制台上,而子进程的第二个控制台保持为空并保持不活动状态。不过,我想这是另一个 system/environment 专业和不同的问题。

from sys import argv, stdin, stdout
from threading import Thread
from cmd import Cmd
from time import sleep
from datetime import datetime
from subprocess import Popen, PIPE, CREATE_NEW_CONSOLE

INTRO = 'command line'
PROMPT = '> '


class CommandLine(Cmd):
    """Custom console"""

    def __init__(self, subprocess, intro=INTRO, prompt=PROMPT):
        Cmd.__init__(self)
        self.subprocess = subprocess
        self.intro = intro
        self.prompt = prompt
        self.doc_header = intro
        self.running = False

    def do_date(self, args):
        """Prints the current date and time."""
        print(datetime.now())
        sleep(1)

    def do_exit(self, args):
        """Exits this command line application."""
        print("Exit by user command.")
        if self.subprocess is not None:
            try:
                self.subprocess.terminate()
            except:
                self.subprocess.kill()
        exit()


class Console():

    def __init__(self):
        if '-r' not in argv:
            self.p = Popen(
                ['python.exe', __file__, '-r'],
                stdin=PIPE,
                creationflags=CREATE_NEW_CONSOLE
            )
        else:
            while True:
                data = stdin.read(1)
                if not data:
                    #                     break
                    sleep(1)
                    continue
                stdout.write(data)

    def write(self, data):
        self.p.stdin.write(data.encode('utf8'))
        self.p.stdin.flush()

    def getSubprocess(self):
        if self.p:
            return self.p
        else:
            return None


class Feeder (Thread):

    def __init__(self, console):
        self.console = console
        Thread.__init__(self)

    def run(self):
        feeding(self.console)


def feeding(console):
    for i in range(0, 100):
        console.write('test %i\n' % i)
        sleep(1)


if __name__ == '__main__':
    p = Console()
    if '-r' not in argv:
        thread = Feeder(p)
        thread.setDaemon(True)
        thread.start()

        cl = CommandLine(subprocess=p.getSubprocess())
        cl.use_rawinput = False
        cl.prompt = PROMPT
        cl.cmdloop('\nCommand line is waiting for user input (e.g. help).')

Edit-3:荣誉提名

在上面的问题文本中,我提到使用 ctypes 库直接访问 Windows 控制台 API 作为另一个解决方法(在 "Edit-1: More thoughts" 下)。或者以某种方式仅使用一个控制台,输入提示始终作为整个问题的核心选项保留在底部。 (在 "Edit-2: Furthermore thoughts" 下)

为了使用 ctypes 库,我会根据 Change console font in Windows. And for using just one console I would have tried the following answer to Keep console input line below output 的以下答案来定位自己。我认为这两个答案都可能提供关于这个问题的潜在优点,也许它们对其他人如何遇到这个 post 有帮助。另外,如果我有时间,我会尝试它们是否有效。

您遇到的问题是 Windows 上控制台子系统的架构,您通常看到的控制台 window 不是由 cmd.exe 托管,而是由 conhost.exe,conhost window 的子进程只能连接到一个 conhost 实例,这意味着每个进程只能连接一个 window。

然后这会导致每个控制台都有一个额外的进程 window 你希望有,然后为了查看在那个 window 中显示任何内容你需要查看标准输入和stdout 通常被处理,因为它们由 conhost 实例写入和读取,除非您将 stdin 变成管道(这样您可以写入进程)它不再来自 conhost 而是来自您的父进程等conhost 看不到它。这意味着写入 stdin 的任何内容仅由子进程读取,因此不会由 conhost 显示。

据我所知,没有办法像那样共享管道。

作为副作用,如果您将标准输入设为管道,那么发送到新控制台 window 的所有键盘输入都将无处可去,因为标准输入未连接到那个 window。

对于仅输出函数,这意味着您可以生成一个新进程,该进程通过管道与父进程通信到 stdin,并将所有内容回显到 stdout。

这是一次尝试:

#!python3

import sys, subprocess, time

class Console():
    def __init__(self):
        if '-r' not in sys.argv:
            self.p = subprocess.Popen(
                ['python.exe', __file__, '-r'],
                stdin=subprocess.PIPE,
                creationflags=subprocess.CREATE_NEW_CONSOLE
                )
        else:
            while True:
                data = sys.stdin.read(1)
                if not data:
                    break
                sys.stdout.write(data)

    def write(self, data):
        self.p.stdin.write(data.encode('utf8'))
        self.p.stdin.flush()

if (__name__ == '__main__'):
    p = Console()
    if '-r' not in sys.argv:
        for i in range(0, 100):
            p.write('test %i\n' % i)
            time.sleep(1)

所以两个进程之间的一个很好的简单管道,如果它是子进程,则将输入回显到输出,我使用 -r 来表示实例是否是一个进程,但还有其他方法取决于你如何实现它.

注意几点:

  • 写入标准输入后需要刷新,因为 python 通常使用缓冲。
  • 这种方法的编写方式旨在在其自己的模块中使用 __file__
  • 由于使用 __file__,如果使用 cx_Freeze 或类似方法冻结,此方法可能需要修改。

编辑 1

对于可以用 cx_Freeze 冻结的版本:

Console.py

import sys, subprocess

class Console():
    def __init__(self, ischild=True):
        if not ischild:
            if hasattr(sys, 'frozen'):
                args = ['Console.exe']
            else:
                args = [sys.executable, __file__]
            self.p = subprocess.Popen(
                args,
                stdin=subprocess.PIPE,
                creationflags=subprocess.CREATE_NEW_CONSOLE
                )
        else:
            while True:
                data = sys.stdin.read(1)
                if not data:
                    break
                sys.stdout.write(data)

    def write(self, data):
        self.p.stdin.write(data.encode('utf8'))
        self.p.stdin.flush()

if (__name__ == '__main__'):
    p = Console()

test.py

from Console import Console
import sys, time

if (__name__ == '__main__'):
    p = Console(False)
    for i in range(0, 100):
        p.write('test %i\n' % i)
        time.sleep(1)

setup.py

from cx_Freeze import setup, Executable

setup(
    name = 'Console-test',
    executables = [
        Executable(
            'Console.py',
            base=None,
            ),
        Executable(
            'test.py',
            base=None,
            )
        ]
)

编辑 2

应该可以在像 IDLE 这样的开发工具下工作的新版本

Console.py

#!python3

import ctypes, sys, subprocess

Kernel32 = ctypes.windll.Kernel32

class Console():
    def __init__(self, ischild=True):
        if ischild:
            # try allocate new console
            result = Kernel32.AllocConsole()
            if result > 0:
                # if we succeed open handle to the console output
                sys.stdout = open('CONOUT$', mode='w')
        else:
            # if frozen we assume its names Console.exe
            # note that when frozen 'Win32GUI' must be used as a base
            if hasattr(sys, 'frozen'):
                args = ['Console.exe']
            else:
                # otherwise we use the console free version of python
                args = ['pythonw.exe', __file__]
            self.p = subprocess.Popen(
                args,
                stdin=subprocess.PIPE
                )
            return
        while True:
            data = sys.stdin.read(1)
            if not data:
                break
            sys.stdout.write(data)

    def write(self, data):
        self.p.stdin.write(data.encode('utf8'))
        self.p.stdin.flush()

if (__name__ == '__main__'):
    p = Console()

test.py

from Console import Console
import sys, time

if (__name__ == '__main__'):
    p = Console(False)
    for i in range(0, 100):
        p.write('test %i\n' % i)
        time.sleep(1)

setup.py

from cx_Freeze import setup, Executable

setup(
    name = 'Console-test',
    executables = [
        Executable(
            'Console.py',
            base='Win32GUI',
            ),
        Executable(
            'test.py',
            base=None,
            )
        ]
)

这可以变得更健壮,即始终检查现有控制台并在创建新控制台之前将其分离,并可能更好地处理错误。

由于您在 windows,您可以使用 win32console 模块为您的线程或子进程输出打开第二个控制台或多个控制台。如果您使用 windows.

,这是最简单易行的方法

这是一个示例代码:

import win32console
import multiprocessing

def subprocess(queue):
    win32console.FreeConsole() #Frees subprocess from using main console
    win32console.AllocConsole() #Creates new console and all input and output of subprocess goes to this new console
    while True:
        print(queue.get())
        #prints any output produced by main script passed to subprocess using queue

if __name__ == "__main__": 
    queue = multiprocessing.Queue()
    multiprocessing.Process(target=subprocess, args=[queue]).start()
    while True:
        print("Hello World in main console")
        queue.put("Hello work in sub process console")
        #sends above string to subprocess and it prints it into its console

        #and whatever else you want to do in ur main process

您也可以使用线程来完成此操作。如果你想要队列功能,你必须使用队列模块,因为线程模块没有队列

这里是win32console module documentation