在阻止呼叫期间检查条件
Check for condition during blocking call
我正在编写一些 python 代码来使用以太坊区块链。发送交易后,我 运行 等待交易收据被验证的阻塞调用。当我的程序正在等待接收收据时,我想同时检查最终会取消阻塞调用的条件。我一直在研究使用异步函数或 asyncio,但我看到的解决方案似乎对我的情况不起作用。
有关更多上下文,我有线程 1,它首先构建事务,然后进行阻塞调用以等待来自所述事务的接收:
def thread_1_func(pending_txns: dict, task_name):
# Building transaction here and storing transaction hash in 'txn_hash'
web3.eth.wait_for_transaction_receipt(txn_hash) # blocking call
if pending_txns[task_name] == "Modified": # want to check this simultaneously with the blocking call
sys.exit() # close the thread since the receipt will now never actually be received
然后我有第二个线程,如果用户修改某个文件,它只有 运行s。文件修改表明我们需要发送不同的交易来修改原始发送的交易,然后才能验证并收到收据:
def thread_2_func(pending_txns: dict, task_name):
# build modified version of transaction and send it, store new transaction hash in 'txn_hash'
pending_txns[task_name] = "Modified" #This will validate the condition for the other thread to stop
web3.eth.wait_for_transaction_receipt(txn_hash) #now this thread can block and wait for receipt
同样重要的是要注意,'pending_txns' 是我目前用来在线程之间共享有关待处理事务的信息的字典。我认识到这当然可能不是线程安全的,但到目前为止它已经满足了我的需要。
我希望能够持续检查 'pending_txns' 字典的更新值,以便我可以停止线程 1。实现此目的的最佳方法是什么?
web3.eth.Eth.wait_for_transaction_receipt 函数采用可选的 timeout
参数。默认情况下,它将等待 120
秒,然后引发 web3.exceptions.TimeExhausted
。您可以做的是反复等待一小段时间,捕获异常,然后检查您是否还需要等待收据。如果您不太在意取消不是立即取消,则很容易实现。这是一个 proof-of-concept :
import logging
import sys
import threading
import time
logger = logging.getLogger() # for a clearer output
RECEIPT_WAITING_TIME = 5 # seconds
class FakeWeb3ExceptionTimeExhausted(Exception): pass
def fake_wait_for_transaction_receipt(timeout=120):
logger.info("waiting ...") # but it will never come
lock = threading.Lock()
acquired = lock.acquire()
assert acquired
re_acquired = lock.acquire(blocking=True, timeout=timeout) # can re-acquire the lock (already acquired), so will block until timeout
if not re_acquired:
raise FakeWeb3ExceptionTimeExhausted()
THREAD1_SHOULD_STOP_WAITING = False
def thread_1_func():
while True:
try:
result = fake_wait_for_transaction_receipt(timeout=RECEIPT_WAITING_TIME)
except FakeWeb3ExceptionTimeExhausted:
logger.info("pause waiting")
global THREAD1_SHOULD_STOP_WAITING
if THREAD1_SHOULD_STOP_WAITING:
logger.info("stop waiting <--")
break
else:
logger.info("resume waiting")
else:
logger.info("finish waiting")
# do something with the result
return # don't sys.exit here
def thread_2_func():
time.sleep(15) # let some time pass
# then change your mind
logger.info("--> cancel waiting")
global THREAD1_SHOULD_STOP_WAITING
THREAD1_SHOULD_STOP_WAITING = True
return # explicit
def main():
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(threadName)s %(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO)
thread1 = threading.Thread(target=thread_1_func, args=())
thread1.start()
thread2 = threading.Thread(target=thread_2_func, args=())
thread2.start()
thread1.join()
thread2.join()
sys.exit(0) # exit here
main()
Thread-1 2022-02-04 12:09:46,813 waiting ...
Thread-1 2022-02-04 12:09:51,814 pause waiting
Thread-1 2022-02-04 12:09:51,814 resume waiting
Thread-1 2022-02-04 12:09:51,814 waiting ...
Thread-1 2022-02-04 12:09:56,814 pause waiting
Thread-1 2022-02-04 12:09:56,814 resume waiting
Thread-1 2022-02-04 12:09:56,814 waiting ...
Thread-1 2022-02-04 12:10:01,815 pause waiting
Thread-1 2022-02-04 12:10:01,815 resume waiting
Thread-1 2022-02-04 12:10:01,815 waiting ...
Thread-2 2022-02-04 12:10:01,828 --> cancel waiting
Thread-1 2022-02-04 12:10:06,815 pause waiting
Thread-1 2022-02-04 12:10:06,815 stop waiting <--
在最坏的情况下 RECEIPT_WAITING_TIME
秒后将考虑取消。如果需要,您可以降低此值,但由于循环有开销,可能不会太低 (sub-seconds)。
相当于调用 web3.eth.Eth.get_transaction_receipt
会抛出 web3.exceptions.TransactionNotFound
,然后检查 global
,然后重试。
class FakeWeb3ExceptionTransactionNotFound(Exception): pass
def fake_get_transaction_receipt():
logger.info("get ...")
time.sleep(0.2) # takes a bit of time
raise FakeWeb3ExceptionTransactionNotFound() # never found
def thread_1_func_v2():
while True:
try:
result = fake_get_transaction_receipt()
except FakeWeb3ExceptionTransactionNotFound:
logger.info("not found")
global THREAD1_SHOULD_STOP_WAITING
if THREAD1_SHOULD_STOP_WAITING:
logger.info("stop waiting <--")
break
else:
logger.info("resume waiting")
else:
logger.info("finish waiting")
# do something with the result
return # don't sys.exit here
def main2():
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(threadName)s %(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO)
thread1 = threading.Thread(target=thread_1_func_v2, args=())
thread1.start()
thread2 = threading.Thread(target=thread_2_func, args=())
thread2.start()
thread1.join()
thread2.join()
sys.exit(0) # exit here
main2()
Thread-1 2022-02-04 12:19:59,014 get ...
Thread-1 2022-02-04 12:19:59,214 not found
Thread-1 2022-02-04 12:19:59,214 resume waiting
Thread-1 2022-02-04 12:19:59,214 get ...
Thread-1 2022-02-04 12:19:59,415 not found
Thread-1 2022-02-04 12:19:59,415 resume waiting
Thread-1 2022-02-04 12:19:59,415 get ...
[...]
Thread-1 2022-02-04 12:20:13,461 get ...
Thread-1 2022-02-04 12:20:13,661 not found
Thread-1 2022-02-04 12:20:13,661 resume waiting
Thread-1 2022-02-04 12:20:13,661 get ...
Thread-1 2022-02-04 12:20:13,862 not found
Thread-1 2022-02-04 12:20:13,862 resume waiting
Thread-1 2022-02-04 12:20:13,862 get ...
Thread-2 2022-02-04 12:20:14,029 --> cancel waiting
Thread-1 2022-02-04 12:20:14,062 not found
Thread-1 2022-02-04 12:20:14,063 stop waiting <--
我正在编写一些 python 代码来使用以太坊区块链。发送交易后,我 运行 等待交易收据被验证的阻塞调用。当我的程序正在等待接收收据时,我想同时检查最终会取消阻塞调用的条件。我一直在研究使用异步函数或 asyncio,但我看到的解决方案似乎对我的情况不起作用。
有关更多上下文,我有线程 1,它首先构建事务,然后进行阻塞调用以等待来自所述事务的接收:
def thread_1_func(pending_txns: dict, task_name):
# Building transaction here and storing transaction hash in 'txn_hash'
web3.eth.wait_for_transaction_receipt(txn_hash) # blocking call
if pending_txns[task_name] == "Modified": # want to check this simultaneously with the blocking call
sys.exit() # close the thread since the receipt will now never actually be received
然后我有第二个线程,如果用户修改某个文件,它只有 运行s。文件修改表明我们需要发送不同的交易来修改原始发送的交易,然后才能验证并收到收据:
def thread_2_func(pending_txns: dict, task_name):
# build modified version of transaction and send it, store new transaction hash in 'txn_hash'
pending_txns[task_name] = "Modified" #This will validate the condition for the other thread to stop
web3.eth.wait_for_transaction_receipt(txn_hash) #now this thread can block and wait for receipt
同样重要的是要注意,'pending_txns' 是我目前用来在线程之间共享有关待处理事务的信息的字典。我认识到这当然可能不是线程安全的,但到目前为止它已经满足了我的需要。
我希望能够持续检查 'pending_txns' 字典的更新值,以便我可以停止线程 1。实现此目的的最佳方法是什么?
web3.eth.Eth.wait_for_transaction_receipt 函数采用可选的 timeout
参数。默认情况下,它将等待 120
秒,然后引发 web3.exceptions.TimeExhausted
。您可以做的是反复等待一小段时间,捕获异常,然后检查您是否还需要等待收据。如果您不太在意取消不是立即取消,则很容易实现。这是一个 proof-of-concept :
import logging
import sys
import threading
import time
logger = logging.getLogger() # for a clearer output
RECEIPT_WAITING_TIME = 5 # seconds
class FakeWeb3ExceptionTimeExhausted(Exception): pass
def fake_wait_for_transaction_receipt(timeout=120):
logger.info("waiting ...") # but it will never come
lock = threading.Lock()
acquired = lock.acquire()
assert acquired
re_acquired = lock.acquire(blocking=True, timeout=timeout) # can re-acquire the lock (already acquired), so will block until timeout
if not re_acquired:
raise FakeWeb3ExceptionTimeExhausted()
THREAD1_SHOULD_STOP_WAITING = False
def thread_1_func():
while True:
try:
result = fake_wait_for_transaction_receipt(timeout=RECEIPT_WAITING_TIME)
except FakeWeb3ExceptionTimeExhausted:
logger.info("pause waiting")
global THREAD1_SHOULD_STOP_WAITING
if THREAD1_SHOULD_STOP_WAITING:
logger.info("stop waiting <--")
break
else:
logger.info("resume waiting")
else:
logger.info("finish waiting")
# do something with the result
return # don't sys.exit here
def thread_2_func():
time.sleep(15) # let some time pass
# then change your mind
logger.info("--> cancel waiting")
global THREAD1_SHOULD_STOP_WAITING
THREAD1_SHOULD_STOP_WAITING = True
return # explicit
def main():
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(threadName)s %(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO)
thread1 = threading.Thread(target=thread_1_func, args=())
thread1.start()
thread2 = threading.Thread(target=thread_2_func, args=())
thread2.start()
thread1.join()
thread2.join()
sys.exit(0) # exit here
main()
Thread-1 2022-02-04 12:09:46,813 waiting ...
Thread-1 2022-02-04 12:09:51,814 pause waiting
Thread-1 2022-02-04 12:09:51,814 resume waiting
Thread-1 2022-02-04 12:09:51,814 waiting ...
Thread-1 2022-02-04 12:09:56,814 pause waiting
Thread-1 2022-02-04 12:09:56,814 resume waiting
Thread-1 2022-02-04 12:09:56,814 waiting ...
Thread-1 2022-02-04 12:10:01,815 pause waiting
Thread-1 2022-02-04 12:10:01,815 resume waiting
Thread-1 2022-02-04 12:10:01,815 waiting ...
Thread-2 2022-02-04 12:10:01,828 --> cancel waiting
Thread-1 2022-02-04 12:10:06,815 pause waiting
Thread-1 2022-02-04 12:10:06,815 stop waiting <--
在最坏的情况下 RECEIPT_WAITING_TIME
秒后将考虑取消。如果需要,您可以降低此值,但由于循环有开销,可能不会太低 (sub-seconds)。
相当于调用 web3.eth.Eth.get_transaction_receipt
会抛出 web3.exceptions.TransactionNotFound
,然后检查 global
,然后重试。
class FakeWeb3ExceptionTransactionNotFound(Exception): pass
def fake_get_transaction_receipt():
logger.info("get ...")
time.sleep(0.2) # takes a bit of time
raise FakeWeb3ExceptionTransactionNotFound() # never found
def thread_1_func_v2():
while True:
try:
result = fake_get_transaction_receipt()
except FakeWeb3ExceptionTransactionNotFound:
logger.info("not found")
global THREAD1_SHOULD_STOP_WAITING
if THREAD1_SHOULD_STOP_WAITING:
logger.info("stop waiting <--")
break
else:
logger.info("resume waiting")
else:
logger.info("finish waiting")
# do something with the result
return # don't sys.exit here
def main2():
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(threadName)s %(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO)
thread1 = threading.Thread(target=thread_1_func_v2, args=())
thread1.start()
thread2 = threading.Thread(target=thread_2_func, args=())
thread2.start()
thread1.join()
thread2.join()
sys.exit(0) # exit here
main2()
Thread-1 2022-02-04 12:19:59,014 get ...
Thread-1 2022-02-04 12:19:59,214 not found
Thread-1 2022-02-04 12:19:59,214 resume waiting
Thread-1 2022-02-04 12:19:59,214 get ...
Thread-1 2022-02-04 12:19:59,415 not found
Thread-1 2022-02-04 12:19:59,415 resume waiting
Thread-1 2022-02-04 12:19:59,415 get ...
[...]
Thread-1 2022-02-04 12:20:13,461 get ...
Thread-1 2022-02-04 12:20:13,661 not found
Thread-1 2022-02-04 12:20:13,661 resume waiting
Thread-1 2022-02-04 12:20:13,661 get ...
Thread-1 2022-02-04 12:20:13,862 not found
Thread-1 2022-02-04 12:20:13,862 resume waiting
Thread-1 2022-02-04 12:20:13,862 get ...
Thread-2 2022-02-04 12:20:14,029 --> cancel waiting
Thread-1 2022-02-04 12:20:14,062 not found
Thread-1 2022-02-04 12:20:14,063 stop waiting <--