根据时间限制跳过 while 循环中的步骤
Skip step in while loop subject to time constraint
我有生成许多 N 个序列并存储它们的代码。如果我使 runs
变大,那么循环可能会卡在代码的 while N[j] > 0:
部分。在 while 循环中添加一些内容以跳过一个步骤的最佳方法是什么,比如如果循环中该步骤的时间超过几秒并移至下一个步骤?
runs = 20
Lloc = 1
n0 = 1
#Empty list to store N series
N_array = []
#Empty list to store z series
z_array = []
for r in range(0,runs):
#Set initial z series values to be zero
z = [0]
#Set initial jump process values to be n0
N = [n0]
#Set iteration to be zero
j = 0
#While
while N[j] > 0:
z.append(z[j] + np.random.exponential(Lloc/(2*N[j]**2)))
#Pick jump at position j+1 to be N[j] -1 or +1 with prob 1/2
N.append(N[j] + np.random.choice([-1,1]))
#Update iteration
j = j+1
#Store N,z realisation if sum dz < 10 say
if sum(np.diff(z)) < 10:
N_array.append(N)
z_array.append(z)
#Completion
print((r+1)/runs*100,'%')
我会用这样的东西:
import time
import multiprocessing as mp
def loop_body(i):
print(f"sleeping for {i} seconds")
time.sleep(i)
print(f"sleep of {i} seconds done")
max_wait_time = 2.1
i = 1
while i < 4:
proc = mp.Process(target=loop_body, args=(i,))
proc.start()
proc.join(max_wait_time)
proc.terminate()
i += 1
想法是产生一个单独的进程并等待一定的时间。如果该过程未完成,您可以终止它并继续下一步。
你可以自己测量时间!
from time import perf_counter
#While
start = perf_counter()
while N[j] > 0:
z.append(z[j] + np.random.exponential(Lloc/(2*N[j]**2)))
#Pick jump at position j+1 to be N[j] -1 or +1 with prob 1/2
N.append(N[j] + np.random.choice([-1,1]))
#Update iteration
j = j+1
if perf_counter() - start > TIMEOUT:
break
甚至为此使用上下文:
class Timer:
def __enter__(self):
self.start = perf_counter()
return self
@property
def elapsed(self):
return perf_counter() - self.start
def __exit__(self, exc_type, exc_value, exc_traceback):
pass
with Timer() as t:
while True:
if t.elapsed > TIMEOUT:
break
正如 image357 指出的那样,这并不理想,因为我们仍然需要等到一个循环周期结束,这可能会持续很长时间,但在您的情况下,它应该可以按预期工作。
我有生成许多 N 个序列并存储它们的代码。如果我使 runs
变大,那么循环可能会卡在代码的 while N[j] > 0:
部分。在 while 循环中添加一些内容以跳过一个步骤的最佳方法是什么,比如如果循环中该步骤的时间超过几秒并移至下一个步骤?
runs = 20
Lloc = 1
n0 = 1
#Empty list to store N series
N_array = []
#Empty list to store z series
z_array = []
for r in range(0,runs):
#Set initial z series values to be zero
z = [0]
#Set initial jump process values to be n0
N = [n0]
#Set iteration to be zero
j = 0
#While
while N[j] > 0:
z.append(z[j] + np.random.exponential(Lloc/(2*N[j]**2)))
#Pick jump at position j+1 to be N[j] -1 or +1 with prob 1/2
N.append(N[j] + np.random.choice([-1,1]))
#Update iteration
j = j+1
#Store N,z realisation if sum dz < 10 say
if sum(np.diff(z)) < 10:
N_array.append(N)
z_array.append(z)
#Completion
print((r+1)/runs*100,'%')
我会用这样的东西:
import time
import multiprocessing as mp
def loop_body(i):
print(f"sleeping for {i} seconds")
time.sleep(i)
print(f"sleep of {i} seconds done")
max_wait_time = 2.1
i = 1
while i < 4:
proc = mp.Process(target=loop_body, args=(i,))
proc.start()
proc.join(max_wait_time)
proc.terminate()
i += 1
想法是产生一个单独的进程并等待一定的时间。如果该过程未完成,您可以终止它并继续下一步。
你可以自己测量时间!
from time import perf_counter
#While
start = perf_counter()
while N[j] > 0:
z.append(z[j] + np.random.exponential(Lloc/(2*N[j]**2)))
#Pick jump at position j+1 to be N[j] -1 or +1 with prob 1/2
N.append(N[j] + np.random.choice([-1,1]))
#Update iteration
j = j+1
if perf_counter() - start > TIMEOUT:
break
甚至为此使用上下文:
class Timer:
def __enter__(self):
self.start = perf_counter()
return self
@property
def elapsed(self):
return perf_counter() - self.start
def __exit__(self, exc_type, exc_value, exc_traceback):
pass
with Timer() as t:
while True:
if t.elapsed > TIMEOUT:
break
正如 image357 指出的那样,这并不理想,因为我们仍然需要等到一个循环周期结束,这可能会持续很长时间,但在您的情况下,它应该可以按预期工作。