在单独的线程中将数据写入光盘(并行)
Writing data to disc in seperate thread (in parallel)
我想在一个循环中多次启动一个函数,每次从相机获取图像并将图像写入光盘,而无需循环等待此过程完成。因此,每次调用此函数时,它 运行s 与启动该函数的循环并行进行,这样我就可以同时继续做其他对时间敏感的事情。
我做了这个例子,它使函数 运行 的第一个 "execution" 与循环并行,然后第二次失败,因为我不能 .start() 两次。可以通过其他方式实现吗?
示例(原始 post - 更新如下)
import numpy as np
import threading
import time
def imacq():
print('acquiring image...')
time.sleep(1.8)
print('saved image...')
return
# Start image acqusition and writing to disc thread
imacq_thread = threading.Thread(target=imacq)
starttime = time.time()
sig_arr = np.zeros(100)
tim_arr = np.zeros(100)
image_cycles = 5
running = True
flag = True
for cycles in range(1,20):
print(cycles)
if cycles%image_cycles == 0:
if flag is True:
imacq_thread.start() # this works well the first time as intended
# imacq() # this does not work as everything is paused until imacp() returns
flag = False
else:
flag = True
time.sleep(0.4)
编辑:根据 Sylvaus 的反馈:
我已经制作了两个不同的版本来触发一个函数,该函数最终将用于在驱动器上获取和存储图像,并与一个主脚本并行,该主脚本决定发送 trigger/execute 函数的时间。一个版本基于 Sylvaus 的答案(线程),另一个版本基于多处理。
基于 Sylvaus 回答的示例(线程):
import matplotlib.pyplot as plt
import numpy as np
import time
from concurrent.futures import ThreadPoolExecutor
def imacq():
print('taking image')
n = 10000
np.ones((n, n))*np.ones((n, n)) # calculations taking time
print('saving image')
return
sig_arr = np.zeros(100)
tim_arr = np.zeros(100)
image_cycles = 20
max_cycles = 100
freq = 10
cycles = 1
sigSign = 1
running = True
flag = True
timeinc = []
tic = time.time()
tic2 = tic
timeinc = np.zeros(max_cycles)
starttime = time.time()
with ThreadPoolExecutor() as executor:
while running:
t = time.time()-starttime
tim_arr[:-1] = tim_arr[1:]
tim_arr[-1] = t
signal = np.sin(freq*t*(2.0*np.pi))
sig_arr[:-1] = sig_arr[1:]
sig_arr[-1] = signal
time.sleep(0.00001)
# Calculate cycle number
sigSignOld = sigSign
sigSign = np.sign(sig_arr[-1]-sig_arr[-2])
if sigSign == 1 and sigSignOld != sigSign:
timeinc[cycles] = time.time()-tic
cycles += 1
print('cycles: ', cycles, ' time inc.: ', str(timeinc[cycles-1]))
tic = time.time()
if cycles%image_cycles == 0:
if flag is True:
# The function is submitted and will be processed by a
# a thread as soon as one is available
executor.submit(imacq)
flag = False
else:
flag = True
if cycles >= max_cycles:
running = False
print('total time: ', time.time()-tic2)
fig = plt.figure()
ax = plt.axes()
plt.plot(timeinc)
基于多处理的示例:
import matplotlib.pyplot as plt
import numpy as np
import time
from multiprocessing import Process, Value, Lock
def trig_resp(running, trigger, p_count, pt, lock):
while running.value == 1: # note ".value" on each sharedctype variable
time.sleep(0.0001) # sleeping in order not to load CPU too excessively
if trigger.value == 1:
with lock: # lock "global" variable before wrtting to it
trigger.value = 0 # reset trigger
tic = time.time()
# Do a calculation that takes a significant time
n = 10000; np.ones((n, n))*np.ones((n, n))
with lock:
pt.value = time.time() - tic # calculate process time
p_count.value += 1 # count number of finished processes
return
if __name__ == "__main__":
# initialize shared values (global accross processes/sharedctype).
# Type 'i': integer, type 'd': double.
trigger = Value('i', 0) # used to trigger execution placed in trig_resp()
running = Value('i', 1) # A way to break the loop in trig_resp()
p_count = Value('i', 0) # process counter and flag that process is done
pt = Value('d', 0.0) # process time of latest finished process
lock = Lock() # lock object used to avoid raise conditions when changing "global" values.
p_count_old = p_count.value
p1 = Process(target=trig_resp, args=(running, trigger, p_count, pt, lock))
p1.start() # Start process
# A "simulated" sinusiodal signal
array_len = 50
sig_arr = np.zeros(array_len) # Signal array
tim_arr = np.zeros(array_len) # Correpsonding time array
freq = 10 # frequency of signal
# trigger settings
im_int = 20 # cycle interval for triggering (acquiring images)
max_cycles = 100 # max number of cycles before stopping main
# initializing counters etc.
cycles = 1 # number of cycles counted
sigSign = 1 # sign of signal gradient
flag = 1 # used to only set trigger once for the current cycle count
trigger_count = 0 # counts how many times a trigger has been set
tic = time.time()
tic2 = tic
timeinc = np.zeros(max_cycles) # Array to keep track of time used for each main loop run
starttime = time.time()
while running.value == 1:
time.sleep(0.00001) # mimics sample time (real world signal)
t = time.time()-starttime # local time
signal = np.sin(freq*t*(2.0*np.pi)) # simulated signal
# Keeping the latest array_len values (FIFO) of t and signal.
tim_arr[:-1] = tim_arr[1:]
tim_arr[-1] = t
sig_arr[:-1] = sig_arr[1:]
sig_arr[-1] = signal
if p_count.value == p_count_old + 1: # process have finished
print('Process counter: ', p_count.value, 'process_time: ', pt.value)
p_count_old = p_count.value
# Calculate cycle number by monotoring sign of the gradient
sigSignOld = sigSign # Keeping track of previous signal gradient sign
sigSign = np.sign(sig_arr[-1]-sig_arr[-2]) # current gradient sign
if sigSign == 1 and sigSignOld == -1: # a local minimum just happened
timeinc[cycles] = time.time()-tic
cycles += 1
print('cycles: ', cycles, ' time inc.: ', str(timeinc[cycles-1]))
tic = time.time()
flag = 1
if cycles % im_int == 0 and flag == 1:
if cycles > 0:
if trigger_count > p_count.value:
print('WARNING: Process: ', p_count.value,
'did not finish yet. Reduce freq or increase im_int')
trigger.value = 1
trigger_count += 1
print('Trigger number: ', trigger_count)
flag = 0
if cycles >= max_cycles:
running.value = 0
print('total cycle time: ', time.time()-tic2)
# Print the process time of the last run
if p_count.value < max_cycles//im_int:
if p_count.value == p_count_old + 1:
print('process counter: ', p_count.value, 'process_time: ', pt.value)
p_count_old = p_count.value
print('total process time: ', time.time()-tic2)
fig = plt.figure()
ax = plt.axes()
plt.plot(timeinc)
我在 windows 10 笔记本电脑上,所以时间(主 while 循环每个循环的时间增量 "while running...:")取决于我的计算机上发生的其他事情,但版本基于多处理的似乎不如基于线程的敏感。然而,基于多处理的不是很优雅,我怀疑可能有一个更聪明的解决方案(更简单,更不容易犯错误)可以达到相同或更好的效果(一致的时间增量,负载较低 CPU).
我在此处分别附上了多进程和线程示例的时间增量图:
非常感谢任何关于改进这两个解决方案的反馈。
您可以使用 Executor。这样,您只需提交您的任务,它们将根据您使用的执行器类型进行处理。
我不知道你的 imacq
里有什么,所以你可能需要尝试 ThreadPoolExecutor
和 ProcessPoolExecutor
来找到最适合你的应用程序的那个。
示例:
import numpy as np
import time
from concurrent.futures import ThreadPoolExecutor
def imacq():
print('acquiring image...')
time.sleep(1.8)
print('saved image...')
return
starttime = time.time()
sig_arr = np.zeros(100)
tim_arr = np.zeros(100)
image_cycles = 5
running = True
flag = True
with ThreadPoolExecutor() as executor:
for cycles in range(1,20):
print(cycles)
if cycles%image_cycles == 0:
if flag is True:
# The function is submitted and will be processed by a
# a thread as soon as one is available
executor.submit(imacq)
flag = False
else:
flag = True
time.sleep(0.4)
您的采集设备、数据速率和体积的详细信息似乎不是很清楚,但我的印象是问题是您希望尽快采集一个信号并获得图像只要该信号是 "interesting" 就尽快捕获并写入磁盘,但不会延迟下一次信号采集。
因此,似乎主信号采集过程和图像捕获过程之间所需的数据交换最少。恕我直言,这建议使用多处理(因此没有 GIL)和使用队列(没有大量数据需要 pickle)在两个进程之间进行通信。
所以,我会考虑这种类型的设置:
#!/usr/bin/env python3
from multiprocessing import Process, Queue, freeze_support
def ImageCapture(queue):
while True:
# Wait till told to capture image - message could contain event reference number
item = queue.get()
if item == -1:
break
# Capture image and save to disk
def main():
# Create queue to send image capture requests on
queue = Queue(8)
# Start image acquisition process
p = Process(target=ImageCapture, args=(queue,))
p.start()
# do forever
# acquire from DAQ
# if interesting
# queue.put(event reference number or filename)
# Stop image acquisition process
queue.put(-1)
p.join()
if __name__ == "__main__":
# Some Windows thing
freeze_support()
main()
如果ImageCapture()
进程跟不上,启动两个或更多。
在我的 Mac 上,我测得队列上的平均消息传递时间为 32 微秒,100 万条消息的最大延迟为 120 微秒。
我想在一个循环中多次启动一个函数,每次从相机获取图像并将图像写入光盘,而无需循环等待此过程完成。因此,每次调用此函数时,它 运行s 与启动该函数的循环并行进行,这样我就可以同时继续做其他对时间敏感的事情。
我做了这个例子,它使函数 运行 的第一个 "execution" 与循环并行,然后第二次失败,因为我不能 .start() 两次。可以通过其他方式实现吗?
示例(原始 post - 更新如下)
import numpy as np
import threading
import time
def imacq():
print('acquiring image...')
time.sleep(1.8)
print('saved image...')
return
# Start image acqusition and writing to disc thread
imacq_thread = threading.Thread(target=imacq)
starttime = time.time()
sig_arr = np.zeros(100)
tim_arr = np.zeros(100)
image_cycles = 5
running = True
flag = True
for cycles in range(1,20):
print(cycles)
if cycles%image_cycles == 0:
if flag is True:
imacq_thread.start() # this works well the first time as intended
# imacq() # this does not work as everything is paused until imacp() returns
flag = False
else:
flag = True
time.sleep(0.4)
编辑:根据 Sylvaus 的反馈: 我已经制作了两个不同的版本来触发一个函数,该函数最终将用于在驱动器上获取和存储图像,并与一个主脚本并行,该主脚本决定发送 trigger/execute 函数的时间。一个版本基于 Sylvaus 的答案(线程),另一个版本基于多处理。
基于 Sylvaus 回答的示例(线程):
import matplotlib.pyplot as plt
import numpy as np
import time
from concurrent.futures import ThreadPoolExecutor
def imacq():
print('taking image')
n = 10000
np.ones((n, n))*np.ones((n, n)) # calculations taking time
print('saving image')
return
sig_arr = np.zeros(100)
tim_arr = np.zeros(100)
image_cycles = 20
max_cycles = 100
freq = 10
cycles = 1
sigSign = 1
running = True
flag = True
timeinc = []
tic = time.time()
tic2 = tic
timeinc = np.zeros(max_cycles)
starttime = time.time()
with ThreadPoolExecutor() as executor:
while running:
t = time.time()-starttime
tim_arr[:-1] = tim_arr[1:]
tim_arr[-1] = t
signal = np.sin(freq*t*(2.0*np.pi))
sig_arr[:-1] = sig_arr[1:]
sig_arr[-1] = signal
time.sleep(0.00001)
# Calculate cycle number
sigSignOld = sigSign
sigSign = np.sign(sig_arr[-1]-sig_arr[-2])
if sigSign == 1 and sigSignOld != sigSign:
timeinc[cycles] = time.time()-tic
cycles += 1
print('cycles: ', cycles, ' time inc.: ', str(timeinc[cycles-1]))
tic = time.time()
if cycles%image_cycles == 0:
if flag is True:
# The function is submitted and will be processed by a
# a thread as soon as one is available
executor.submit(imacq)
flag = False
else:
flag = True
if cycles >= max_cycles:
running = False
print('total time: ', time.time()-tic2)
fig = plt.figure()
ax = plt.axes()
plt.plot(timeinc)
基于多处理的示例:
import matplotlib.pyplot as plt
import numpy as np
import time
from multiprocessing import Process, Value, Lock
def trig_resp(running, trigger, p_count, pt, lock):
while running.value == 1: # note ".value" on each sharedctype variable
time.sleep(0.0001) # sleeping in order not to load CPU too excessively
if trigger.value == 1:
with lock: # lock "global" variable before wrtting to it
trigger.value = 0 # reset trigger
tic = time.time()
# Do a calculation that takes a significant time
n = 10000; np.ones((n, n))*np.ones((n, n))
with lock:
pt.value = time.time() - tic # calculate process time
p_count.value += 1 # count number of finished processes
return
if __name__ == "__main__":
# initialize shared values (global accross processes/sharedctype).
# Type 'i': integer, type 'd': double.
trigger = Value('i', 0) # used to trigger execution placed in trig_resp()
running = Value('i', 1) # A way to break the loop in trig_resp()
p_count = Value('i', 0) # process counter and flag that process is done
pt = Value('d', 0.0) # process time of latest finished process
lock = Lock() # lock object used to avoid raise conditions when changing "global" values.
p_count_old = p_count.value
p1 = Process(target=trig_resp, args=(running, trigger, p_count, pt, lock))
p1.start() # Start process
# A "simulated" sinusiodal signal
array_len = 50
sig_arr = np.zeros(array_len) # Signal array
tim_arr = np.zeros(array_len) # Correpsonding time array
freq = 10 # frequency of signal
# trigger settings
im_int = 20 # cycle interval for triggering (acquiring images)
max_cycles = 100 # max number of cycles before stopping main
# initializing counters etc.
cycles = 1 # number of cycles counted
sigSign = 1 # sign of signal gradient
flag = 1 # used to only set trigger once for the current cycle count
trigger_count = 0 # counts how many times a trigger has been set
tic = time.time()
tic2 = tic
timeinc = np.zeros(max_cycles) # Array to keep track of time used for each main loop run
starttime = time.time()
while running.value == 1:
time.sleep(0.00001) # mimics sample time (real world signal)
t = time.time()-starttime # local time
signal = np.sin(freq*t*(2.0*np.pi)) # simulated signal
# Keeping the latest array_len values (FIFO) of t and signal.
tim_arr[:-1] = tim_arr[1:]
tim_arr[-1] = t
sig_arr[:-1] = sig_arr[1:]
sig_arr[-1] = signal
if p_count.value == p_count_old + 1: # process have finished
print('Process counter: ', p_count.value, 'process_time: ', pt.value)
p_count_old = p_count.value
# Calculate cycle number by monotoring sign of the gradient
sigSignOld = sigSign # Keeping track of previous signal gradient sign
sigSign = np.sign(sig_arr[-1]-sig_arr[-2]) # current gradient sign
if sigSign == 1 and sigSignOld == -1: # a local minimum just happened
timeinc[cycles] = time.time()-tic
cycles += 1
print('cycles: ', cycles, ' time inc.: ', str(timeinc[cycles-1]))
tic = time.time()
flag = 1
if cycles % im_int == 0 and flag == 1:
if cycles > 0:
if trigger_count > p_count.value:
print('WARNING: Process: ', p_count.value,
'did not finish yet. Reduce freq or increase im_int')
trigger.value = 1
trigger_count += 1
print('Trigger number: ', trigger_count)
flag = 0
if cycles >= max_cycles:
running.value = 0
print('total cycle time: ', time.time()-tic2)
# Print the process time of the last run
if p_count.value < max_cycles//im_int:
if p_count.value == p_count_old + 1:
print('process counter: ', p_count.value, 'process_time: ', pt.value)
p_count_old = p_count.value
print('total process time: ', time.time()-tic2)
fig = plt.figure()
ax = plt.axes()
plt.plot(timeinc)
我在 windows 10 笔记本电脑上,所以时间(主 while 循环每个循环的时间增量 "while running...:")取决于我的计算机上发生的其他事情,但版本基于多处理的似乎不如基于线程的敏感。然而,基于多处理的不是很优雅,我怀疑可能有一个更聪明的解决方案(更简单,更不容易犯错误)可以达到相同或更好的效果(一致的时间增量,负载较低 CPU).
我在此处分别附上了多进程和线程示例的时间增量图:
非常感谢任何关于改进这两个解决方案的反馈。
您可以使用 Executor。这样,您只需提交您的任务,它们将根据您使用的执行器类型进行处理。
我不知道你的 imacq
里有什么,所以你可能需要尝试 ThreadPoolExecutor
和 ProcessPoolExecutor
来找到最适合你的应用程序的那个。
示例:
import numpy as np
import time
from concurrent.futures import ThreadPoolExecutor
def imacq():
print('acquiring image...')
time.sleep(1.8)
print('saved image...')
return
starttime = time.time()
sig_arr = np.zeros(100)
tim_arr = np.zeros(100)
image_cycles = 5
running = True
flag = True
with ThreadPoolExecutor() as executor:
for cycles in range(1,20):
print(cycles)
if cycles%image_cycles == 0:
if flag is True:
# The function is submitted and will be processed by a
# a thread as soon as one is available
executor.submit(imacq)
flag = False
else:
flag = True
time.sleep(0.4)
您的采集设备、数据速率和体积的详细信息似乎不是很清楚,但我的印象是问题是您希望尽快采集一个信号并获得图像只要该信号是 "interesting" 就尽快捕获并写入磁盘,但不会延迟下一次信号采集。
因此,似乎主信号采集过程和图像捕获过程之间所需的数据交换最少。恕我直言,这建议使用多处理(因此没有 GIL)和使用队列(没有大量数据需要 pickle)在两个进程之间进行通信。
所以,我会考虑这种类型的设置:
#!/usr/bin/env python3
from multiprocessing import Process, Queue, freeze_support
def ImageCapture(queue):
while True:
# Wait till told to capture image - message could contain event reference number
item = queue.get()
if item == -1:
break
# Capture image and save to disk
def main():
# Create queue to send image capture requests on
queue = Queue(8)
# Start image acquisition process
p = Process(target=ImageCapture, args=(queue,))
p.start()
# do forever
# acquire from DAQ
# if interesting
# queue.put(event reference number or filename)
# Stop image acquisition process
queue.put(-1)
p.join()
if __name__ == "__main__":
# Some Windows thing
freeze_support()
main()
如果ImageCapture()
进程跟不上,启动两个或更多。
在我的 Mac 上,我测得队列上的平均消息传递时间为 32 微秒,100 万条消息的最大延迟为 120 微秒。