在阻止呼叫期间检查条件

Check for condition during blocking call

我正在编写一些 python 代码来使用以太坊区块链。发送交易后,我 运行 等待交易收据被验证的阻塞调用。当我的程序正在等待接收收据时,我想同时检查最终会取消阻塞调用的条件。我一直在研究使用异步函数或 asyncio,但我看到的解决方案似乎对我的情况不起作用。

有关更多上下文,我有线程 1,它首先构建事务,然后进行阻塞调用以等待来自所述事务的接收:

def thread_1_func(pending_txns: dict, task_name):
  # Building transaction here and storing transaction hash in 'txn_hash'
  web3.eth.wait_for_transaction_receipt(txn_hash) # blocking call
  if pending_txns[task_name] == "Modified": # want to check this simultaneously with the blocking call
    sys.exit() # close the thread since the receipt will now never actually be received

然后我有第二个线程,如果用户修改某个文件,它只有 运行s。文件修改表明我们需要发送不同的交易来修改原始发送的交易,然后才能验证并收到收据:

def thread_2_func(pending_txns: dict, task_name):
  # build modified version of transaction and send it, store new transaction hash in 'txn_hash'
  pending_txns[task_name] = "Modified" #This will validate the condition for the other thread to stop
  web3.eth.wait_for_transaction_receipt(txn_hash) #now this thread can block and wait for receipt

同样重要的是要注意,'pending_txns' 是我目前用来在线程之间共享有关待处理事务的信息的字典。我认识到这当然可能不是线程安全的,但到目前为止它已经满足了我的需要。

我希望能够持续检查 'pending_txns' 字典的更新值,以便我可以停止线程 1。实现此目的的最佳方法是什么?

web3.eth.Eth.wait_for_transaction_receipt 函数采用可选的 timeout 参数。默认情况下,它将等待 120 秒,然后引发 web3.exceptions.TimeExhausted。您可以做的是反复等待一小段时间,捕获异常,然后检查您是否还需要等待收据。如果您不太在意取消不是立即取消,则很容易实现。这是一个 proof-of-concept :

import logging
import sys
import threading
import time


logger = logging.getLogger()  # for a clearer output


RECEIPT_WAITING_TIME = 5  # seconds


class FakeWeb3ExceptionTimeExhausted(Exception): pass


def fake_wait_for_transaction_receipt(timeout=120):
    logger.info("waiting ...")  # but it will never come
    lock = threading.Lock()
    acquired = lock.acquire()
    assert acquired
    re_acquired = lock.acquire(blocking=True, timeout=timeout)  # can re-acquire the lock (already acquired), so will block until timeout
    if not re_acquired:
        raise FakeWeb3ExceptionTimeExhausted()


THREAD1_SHOULD_STOP_WAITING = False


def thread_1_func():
    while True:
        try:
            result = fake_wait_for_transaction_receipt(timeout=RECEIPT_WAITING_TIME)
        except FakeWeb3ExceptionTimeExhausted:
            logger.info("pause waiting")
            global THREAD1_SHOULD_STOP_WAITING
            if THREAD1_SHOULD_STOP_WAITING:
                logger.info("stop waiting <--")
                break
            else:
                logger.info("resume waiting")
        else:
            logger.info("finish waiting")
            # do something with the result
            return  # don't sys.exit here


def thread_2_func():
    time.sleep(15)  # let some time pass
    # then change your mind
    logger.info("--> cancel waiting")
    global THREAD1_SHOULD_STOP_WAITING
    THREAD1_SHOULD_STOP_WAITING = True
    return  # explicit


def main():
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(logging.Formatter("%(threadName)s %(asctime)s %(message)s"))
    logger.addHandler(handler)
    logger.setLevel(logging.INFO)

    thread1 = threading.Thread(target=thread_1_func, args=())
    thread1.start()
    thread2 = threading.Thread(target=thread_2_func, args=())
    thread2.start()

    thread1.join()
    thread2.join()
    sys.exit(0)  # exit here

main()
Thread-1 2022-02-04 12:09:46,813 waiting ...
Thread-1 2022-02-04 12:09:51,814 pause waiting
Thread-1 2022-02-04 12:09:51,814 resume waiting
Thread-1 2022-02-04 12:09:51,814 waiting ...
Thread-1 2022-02-04 12:09:56,814 pause waiting
Thread-1 2022-02-04 12:09:56,814 resume waiting
Thread-1 2022-02-04 12:09:56,814 waiting ...
Thread-1 2022-02-04 12:10:01,815 pause waiting
Thread-1 2022-02-04 12:10:01,815 resume waiting
Thread-1 2022-02-04 12:10:01,815 waiting ...
Thread-2 2022-02-04 12:10:01,828 --> cancel waiting
Thread-1 2022-02-04 12:10:06,815 pause waiting
Thread-1 2022-02-04 12:10:06,815 stop waiting <--

在最坏的情况下 RECEIPT_WAITING_TIME 秒后将考虑取消。如果需要,您可以降低此值,但由于循环有开销,可能不会太低 (sub-seconds)。

相当于调用 web3.eth.Eth.get_transaction_receipt 会抛出 web3.exceptions.TransactionNotFound,然后检查 global,然后重试。

class FakeWeb3ExceptionTransactionNotFound(Exception): pass


def fake_get_transaction_receipt():
    logger.info("get ...")
    time.sleep(0.2)  # takes a bit of time
    raise FakeWeb3ExceptionTransactionNotFound()  # never found


def thread_1_func_v2():
    while True:
        try:
            result = fake_get_transaction_receipt()
        except FakeWeb3ExceptionTransactionNotFound:
            logger.info("not found")
            global THREAD1_SHOULD_STOP_WAITING
            if THREAD1_SHOULD_STOP_WAITING:
                logger.info("stop waiting <--")
                break
            else:
                logger.info("resume waiting")
        else:
            logger.info("finish waiting")
            # do something with the result
            return  # don't sys.exit here


def main2():
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(logging.Formatter("%(threadName)s %(asctime)s %(message)s"))
    logger.addHandler(handler)
    logger.setLevel(logging.INFO)

    thread1 = threading.Thread(target=thread_1_func_v2, args=())
    thread1.start()
    thread2 = threading.Thread(target=thread_2_func, args=())
    thread2.start()

    thread1.join()
    thread2.join()
    sys.exit(0)  # exit here

main2()
Thread-1 2022-02-04 12:19:59,014 get ...
Thread-1 2022-02-04 12:19:59,214 not found
Thread-1 2022-02-04 12:19:59,214 resume waiting
Thread-1 2022-02-04 12:19:59,214 get ...
Thread-1 2022-02-04 12:19:59,415 not found
Thread-1 2022-02-04 12:19:59,415 resume waiting
Thread-1 2022-02-04 12:19:59,415 get ...
[...]
Thread-1 2022-02-04 12:20:13,461 get ...
Thread-1 2022-02-04 12:20:13,661 not found
Thread-1 2022-02-04 12:20:13,661 resume waiting
Thread-1 2022-02-04 12:20:13,661 get ...
Thread-1 2022-02-04 12:20:13,862 not found
Thread-1 2022-02-04 12:20:13,862 resume waiting
Thread-1 2022-02-04 12:20:13,862 get ...
Thread-2 2022-02-04 12:20:14,029 --> cancel waiting
Thread-1 2022-02-04 12:20:14,062 not found
Thread-1 2022-02-04 12:20:14,063 stop waiting <--