在 Python 中同时使用多个程序

Using multiple programs simultaneously in Python

我是 Python 的新手,我正在尝试编写一个脚本来自动执行测试。

工作原理: 程序A:通过串口发送命令等待响应,然后执行下一条命令 程序 B:使用 TCP_Client.exe 应用程序使用子进程模块将参数(ip、端口、数据包数量、大小)与应用程序一起发送到命令提示符并读取输出。

它应该做什么:

  1. 在通过串行端口访问的设备上启动 tcp 服务器(Pyserial 模块)===========> 程序 A
  2. 发送"tcpclient.exe ..."到命令提示符。这将绑定套接字,然后提示我通过串行端口向设备发送更多命令。 ==========> 程序 B
  3. 向设备发送附加命令 ==========> 程序 A
  4. 输入一个数字以继续tcpclient.exe到发送数据包的下一阶段。 ===========> 程序 B.
  5. 等待发送数据包。程序发送提示说完成但不退出。等待我读取设备上的数据 ===========> 程序 B
  6. 通过串口发送命令读取数据==========>程序A
  7. 返回tcpclient.exe并退出程序。 (基本上需要先打一个数字再继续打完。

RunSer2Command(lines2[21])
time.sleep(1)       
ls_output = subprocess.Popen(['tcpclient.exe','192.168.22.33','5000','100000','1400'],stdout=subprocess.PIPE)
time.sleep(1)
RunSer2Command(lines2[22])
RunSer2Command(lines2[23])
time.sleep(1)
ls_output = subprocess.Popen(['3'],stdout = subprocess.PIPE)
ls_output.communicate()

RunSer2Command(lines2[24])

ser2.close()

像这样

有人可以告诉我是否应该阅读多线程,或者这是否太小以至于不需要它? 如果是这样,我应该寻找什么?

提前致谢

回答我自己的问题。 简短的回答是线程会做到这一点。这也是不必要的。 subprocess 模块足以让我让它工作。我只是没做对

RunSer2Command(lines2[21])
time.sleep(1)   
ls_output = subprocess.Popen(['tcpclient.exe','192.168.4.110','8000','10000','1400'],stdin=subprocess.PIPE,stdout=subprocess.PIPE,bufsize=3)
time.sleep(2)
RunSer2Command(lines2[22])
RunSer2Command(lines2[23])
time.sleep(1)
ls_output.communicate(input = '3')
ls_output.wait()
RunSer2Command(lines2[24])

对于那些关心穿线路线的人来说,确实让我达到了某个点,我会补充一点,但我没有去到最后一个条件,因为好吧......我找到了更简单的路线

def start_tcp_client(cond): 
    ls_output = subprocess.Popen(['tcpclient.exe','192.168.4.110','8000','1000','1400'],stdin=subprocess.PIPE,stdout=subprocess.PIPE,bufsize=3)
    with cond:
        cond.wait()
        ls_output.communicate(input = '3')
        ls_output.communicate()

def TCPSettings(cond):
    with cond:
        RunSer2Command(lines2[22])
        RunSer2Command(lines2[23])
        cond.notify()

    condition = threading.Condition()
    condition1 = threading.Condition()
    Client_thread=threading.Thread(name='Client_thread', target=start_tcp_client, args=(condition,))
    TCP_thread=threading.Thread(name='TCP_thread', target=TCPSettings, args=(condition,))
    RunSer2Command(lines2[21])
    time.sleep(2)   
    Client_thread.start()
    time.sleep(2)
    TCP_thread.start()
    time.sleep(1)
    Client_thread.join()
    time.sleep(10)
    RunSer2Command(lines2[24])

我仍然没有设法解决所有问题。似乎存在一些时间问题。一旦它完美运行,我会更新它。