在从 .exe 写入输出和读取输出之间拆分工作

Split work between writing output from .exe and reading the output

我从苹果应用商店下载了一个简单的 .exe。它提供有关加密货币价格及其百分比变化的实时更新。我正在提取比特币的百分比变化。

我正在使用子进程提取输出。我将输出存储到四个单独的文本文件中,然后我从文件中读取文本,提取我需要的数据并将其保存到 pandas 数据帧中。

此外,我对每个 .exe 都有 60 秒的超时时间 运行,我会在 70 秒后读取文件。我 t运行 通过删除文件内容来分类文件,当下一个文件中有输出时,我也会读取该文件,然后 t运行 分类并重复。

我想知道如何在将输出保存到文本文件和读取内容之间拆分工作,然后 运行对其进行分类。例如,我正在 运行 创建一个简单的线程,它应该 运行 .exe 并使用 write_truncate_output 提取输出。但是,我只有append_output_executable 运行ning.

这是我的脚本:

from subprocess import STDOUT, check_call as x
import os
from multiprocessing import Pool
import time
import re
from collections import defaultdict
import pandas as pd
from multiprocessing import Process

cmd = [r'/Applications/CryptoManiac.app/Contents/MacOS/CryptoManiac']
text_file = ['bitcoin1.txt','bitcoin2.txt','bitcoin3.txt','bitcoin4.txt']

def append_output_executable(cmd):
    while True:
        i = '1234'
        for num in i:
            try: #append the .exe output to multiple files 
                with open(os.devnull, 'rb') as DEVNULL, open('bitcoin{}.txt'.format(num), 'ab') as f:
                    x(cmd,  stdout=f, stderr=STDOUT, timeout=60)
                
            except:
                pass  


def write_truncate_output(text):
    while True:
        time.sleep(70)
        with open(text, 'r+') as f:
            data = f.read()
            f.truncate(0)
            #read and truncate after reading the data
    
            #filter and format
        percentage=re.findall(r'\bpercent_change_24h:\s.*', data)
        value= [x.split(':')[1] for x in percentage]
        key = [x.split(':')[0] for x in percentage]

        #store in dictionary
        percent_dict = defaultdict(list)

        for ke, val in zip(key, value):
            percent_dict[ke].append(val)
            percent_dict['file'].append(text)

        percent_frame = pd.DataFrame(percent_dict)
    
        print(percent_frame)

if __name__ == '__main__':
    for text in text_file:
        execute_process = Process(target = append_output_executable, args=(cmd,))
        output_process = Process(target = write_truncate_output, args=(text,))
        execute_process.start()
        execute_process.join()
        output_process.start()
        output_process.join()
    

这只是 运行 第一个函数。

我不知道这个答案是否解决了你所有的问题,因为你的程序错误很少 - 当你修复一个错误时,它仍然不起作用,因为还有其他错误。


第一个:

ThreadProcess 中的

target 需要没有 () 和参数的函数名称 - (这称为 callback) - 以及以后(当你使用 .start() ) 它将在新的 ThreadProcess

中使用 () 到 运行 这个函数
Thread(target=append_output_executable, args=(cmd,))

Process(target=append_output_executable, args=(cmd,))

第二个:

ThreadProcess 运行 这个函数只有一次,所以它需要 while 一直循环到 运行。而且它不能使用 return 因为它结束了功能。


第三名:

.join() 块代码因为它等待 ThreadProcess 结束并且应该在开始所有 threads/processes 之后使用 - 通常它在最后使用当您想停止所有程序时的程序 threads/processes


还有小建议:

您可以使用全局 running = True 和内部函数 while running - 稍后您可以设置 running = False 来停止函数中的循环(并完成函数)


代码可能如下所示

# ... other imports ...
from threading import Thread

def append_output_executable(cmd):
    while running:
        # ... code ... (without `return`)

def write_truncate_output(text):
    while running:
        # ... code ... (without `return`)
         
# --- main ---

# global variables

running = True

if __name__ == '__main__':
    
    # --- create and start ---
    
    t0 = Thread(target=append_output_executable, args=(cmd,))
    t0.start()
        
    other_threads = []
   
    for text in text_file:
        t = Thread(target=write_truncate_output, args=(text,))
        t.start()
        other_threads.append(t)
        
    # ... other code ...
    
    # --- at the end of program ---
    
    running = False
    
    # --- wait for end of functions ---
    
    t0.join()
    for t in other_threads:
        t.join()

Process

完全一样

(我保留相同的变量名称以表明它们都是相同的)

编辑

进程不共享内存,因此需要使用队列向进程发送信息running。所以这个版本需要修改。

# ... other imports ...
from multiprocessing import Process

def append_output_executable(cmd):
    while running:
        # ... code ... (without `return`)

def write_truncate_output(text):
    while running:
        # ... code ... (without `return`)

# --- main ---

# global variables

running = True

if __name__ == '__main__':

    # --- create and start ---

    t0 = Process(target=append_output_executable, args=(cmd,))
    t0.start()

    other_threads = []

    for text in text_file:
        t = Process(target=write_truncate_output, args=(text,))
        t.start()
        other_threads.append(t)

    # ... other code ...

    # --- at the end of program ---

    running = False

    # --- wait for end of functions ---

    t0.join()
    for t in other_threads:
        t.join()