Python 当一个 returns 一个结果时停止多个进程?

Python stop multiple process when one returns a result?

我正在尝试在 python 中编写一个简单的工作量证明随机数查找器。

def proof_of_work(b, nBytes):
    nonce = 0
    # while the first nBytes of hash(b + nonce) are not 0
    while sha256(b + uint2bytes(nonce))[:nBytes] != bytes(nBytes):
        nonce = nonce + 1
    return nonce

现在我正在尝试进行多处理,因此它可以使用所有 CPU 核心并更快地找到随机数。我的想法是使用 multiprocessing.Pool 并多次执行函数 proof_of_work,传递两个参数 num_of_cpus_runningthis_cpu_id,如下所示:

def proof_of_work(b, nBytes, num_of_cpus_running, this_cpu_id):
    nonce = this_cpu_id
    while sha256(b + uint2bytes(nonce))[:nBytes] != bytes(nBytes):
        nonce = nonce + num_of_cpus_running
    return nonce

所以,如果有 4 个核心,每个核心都会这样计算随机数:

core 0: 0, 4, 8, 16, 32 ...
core 1: 1, 5, 9, 17, 33 ...
core 2: 2, 6, 10, 18, 34 ...
core 3: 3, 7, 15, 31, 38 ...

所以,我必须重写 proof_of_work 所以当任何进程找到随机数时,其他所有人都停止寻找随机数,考虑到找到的随机数必须是可能的最低值所需字节为 0。如果 CPU 由于某种原因加速,并且 returns 有效随机数高于最低有效随机数,则工作量证明无效。

我唯一不知道该怎么做的部分是只有当进程 B 发现一个随机数低于进程 A 现在正在计算的随机数时,进程 A 才会停止的部分。如果它更高,A 不断计算(以防万一)直到它到达 B 提供的随机数。

希望我的解释是正确的。另外,如果我写的任何东西有更快的实现,我很想听听。非常感谢!

执行此操作的一般方法是:

  1. 考虑工作包,例如执行特定 范围 的计算,范围不应花费很长时间,比如 0.1 秒到一秒
  2. 让一些经理将工作包分发给工人
  3. 一个工作包结束后,将结果告诉经理并申请一个新的工作包
  4. 如果工作已完成并找到结果,请接受工作人员的结果并向他们发出不再执行工作的信号 - 工作人员现在可以安全地终止

这样您就不必在每次迭代时都与管理器核对(这会减慢一切),或者做一些令人讨厌的事情,比如在会话中停止线程。不用说,管理器需要线程安全。

这非常适合您的模型,因为您仍然需要其他工作人员的结果,即使已找到结果。


请注意,在您的模型中,一个线程可能与其他线程不同步,滞后。一旦找到结果,您不想再进行一百万次计算。我只是从问题中重申这一点,因为我认为模型是错误的。您应该修复模型而不是修复实现。

您可以使用 multiprocessing.Queue()。每个 CPU/process 有一个队列。当一个进程找到一个随机数时,它会将它放在其他进程的队列中。其他进程在 while 循环的每次迭代中检查它们的队列(非阻塞),如果队列中有任何东西,它们根据队列中的值决定继续或终止:

def proof_of_work(b, nBytes, num_of_cpus_running, this_cpu_id, qSelf, qOthers):
    nonce = this_cpu_id
    while sha256(b + uint2bytes(nonce))[:nBytes] != bytes(nBytes):
        nonce = nonce + num_of_cpus_running
        try:
            otherNonce = qSelf.get(block=False)
            if otherNonce < nonce:
                return
        except:
            pass
    for q in qOthers:
        q.put(nonce)
    return nonce

qOthers 是属于其他进程的队列列表(每个队列=multiprocessing.Queue())。

如果您决定按照我的建议使用队列,您应该能够编写 better/nicer 上述方法的实现。

一个简单的选择是使用微批处理并检查是否找到答案。太小的批次会导致启动并行作业的开销,太大会导致其他进程做额外的工作,而一个进程已经找到了答案。每批应该需要 1 - 10 秒才能有效。

示例代码:

from multiprocessing import Pool
from hashlib import sha256
from time import time


def find_solution(args):
    salt, nBytes, nonce_range = args
    target = '0' * nBytes

    for nonce in xrange(nonce_range[0], nonce_range[1]):
        result = sha256(salt + str(nonce)).hexdigest()

        #print('%s %s vs %s' % (result, result[:nBytes], target)); sleep(0.1)

        if result[:nBytes] == target:
            return (nonce, result)

    return None


def proof_of_work(salt, nBytes):
    n_processes = 8
    batch_size = int(2.5e5)
    pool = Pool(n_processes)

    nonce = 0

    while True:
        nonce_ranges = [
            (nonce + i * batch_size, nonce + (i+1) * batch_size)
            for i in range(n_processes)
        ]

        params = [
            (salt, nBytes, nonce_range) for nonce_range in nonce_ranges
        ]

        # Single-process search:
        #solutions = map(find_solution, params)

        # Multi-process search:
        solutions = pool.map(find_solution, params)

        print('Searched %d to %d' % (nonce_ranges[0][0], nonce_ranges[-1][1]-1))

        # Find non-None results
        solutions = filter(None, solutions)

        if solutions:
            return solutions

        nonce += n_processes * batch_size


if __name__ == '__main__':
    start = time()
    solutions = proof_of_work('abc', 6)
    print('\n'.join('%d => %s' % s for s in solutions))
    print('Solution found in %.3f seconds' % (time() - start))

输出(配备 Core i7 的笔记本电脑):

Searched 0 to 1999999
Searched 2000000 to 3999999
Searched 4000000 to 5999999
Searched 6000000 to 7999999
Searched 8000000 to 9999999
Searched 10000000 to 11999999
Searched 12000000 to 13999999
Searched 14000000 to 15999999
Searched 16000000 to 17999999
Searched 18000000 to 19999999
Searched 20000000 to 21999999
Searched 22000000 to 23999999
Searched 24000000 to 25999999
Searched 26000000 to 27999999
Searched 28000000 to 29999999
Searched 30000000 to 31999999
Searched 32000000 to 33999999
Searched 34000000 to 35999999
Searched 36000000 to 37999999
37196346 => 000000f4c9aee9d427dc94316fd49192a07f1aeca52f6b7c3bb76be10c5adf4d
Solution found in 20.536 seconds

单核耗时 76.468 秒。无论如何,这不是迄今为止找到解决方案的最有效方法,但它确实有效。例如,如果 salt 很长,那么 SHA-256 状态可以在盐被吸收后预先计算,并从那里继续暴力搜索。字节数组也可能比 hexdigest().

更有效

我想通过将 pool.map 更改为 pool.imap_unordered 来改进 NikoNyrh 的回答。使用 imap_unordered 将 return 立即从任何工人那里得到结果,而无需等待所有工人完成。因此,一旦任何结果 return 成为元组,我们就可以退出 while 循环。

def proof_of_work(salt, nBytes):
    n_processes = 8
    batch_size = int(2.5e5)
    with Pool(n_processes) as pool:

        nonce = 0

        while True:
            nonce_ranges = [
                (nonce + i * batch_size, nonce + (i+1) * batch_size)
                for i in range(n_processes)
            ]

            params = [
                (salt, nBytes, nonce_range) for nonce_range in nonce_ranges

           ]
            print('Searched %d to %d' % (nonce_ranges[0][0], nonce_ranges[-1][1]-1))

            for result in pool.imap_unordered(find_solution, params):
                if isinstance(result,tuple): return result
            
            nonce += n_processes * batch_size