Python 中二维阵列的连续波

Continuos saves of 2D arrays in Python

我正在编写一个程序,用于从 sCMOS(科学 CMOS)相机获取数据。由于计划以高帧率获取,我想在获取时保存到磁盘,从而增加在没有内存的情况下结束之前我可以记录的总时间。

有没有办法以二进制格式连续保存到同一个文件?理想情况下排除每帧制作一个文件的选项。

经过一段时间的修修补补,我找到了使用多线程模块解决这个问题的方法。这个想法是有两个进程运行,主进程获取数据,工作进程不断地保存到磁盘。要实现它,您需要定义一个队列,它将在进程之间以安全的方式共享数据。一旦一帧被保存,它就会释放内存。 重要的是使用多处理而不是线程。多处理实际上将进程分离到不同的 Python 解释器中。线程使用相同的解释器。因此,如果您的某个进程占用了您所在 运行 脚本的核心的 100%,事情就会停止。在我的应用程序中,这是至关重要的,因为它会显着改变帧速率。

注意:我正在使用 h5py 以 HDF5 格式保存文件,但您可以使用 numpy 等轻松调整代码以保存为纯文本文件

首先,我定义了稍后将发送到不同进程的辅助函数。输入是保存数据的文件和数据队列。无限循环是因为我没有在我决定之前退出的功能,即使队列是空的。退出标志只是一个传递给队列的字符串。

import h5py
from multiprocessing import Process, Queue

def workerSaver(fileData,q):
    """Function that can be run in a separate thread for continuously save data to disk.
    fileData -- STRING with the path to the file to use.
    q -- Queue that will store all the images to be saved to disk.
    """
    f = h5py.File(fileData, "w") # This will overwrite the file. Be sure to supply a new file path.

    allocate = 100 # Number of frames to allocate along the z-axis.
    keep_saving = True # Flag that will stop the worker function if running in a separate thread.
                       # Has to be submitted via the queue a string 'exit'
    i=0
    while keep_saving:
        while not q.empty():
            img = q.get()
            if i == 0: # First time it runs, creates the dataset
                x = img.shape[0]
                y = img.shape[1]
                dset = f.create_dataset('image', (x,y,allocate), maxshape=(x,y,None)) # The images are going to be stacked along the z-axis.
                                                                                 # The shape along the z axis will be increased as the number of images increase.
            if type(img)==type('exit'):
                keep_saving = False
            else:
                if i == dset.shape[2]:
                    dset.resize(i+allocate,axis=2)
                dset[:,:,i] = img
                i+=1
    f.close()

现在是代码的重要部分,我们在其中定义工作人员的行为。

import numpy as np
import time
fileData = 'path-to-file.dat'
# Queue of images. multiprocessing takes care of handling the data in and out
# and the sharing between parent and child processes.
q = Queue(0)
# Child process to save the data. It runs continuously until an exit flag
# is passed through the Queue. (q.put('exit'))
p = Process(target=workerSaver,args=(fileData,q,))
p.start()
example_image = np.ones((50,50))
for i in range(10000):
   q.put(example_image)
   print(q.qsize())
   time.sleep(0.01) # Sleep 10ms

q.put('Exit') # Any string would work
p.join()

检查进程 p 是否已启动,并且在我们开始填充队列 q 之前将 运行。当然有更聪明的方法来存储数据(例如以块的形式而不是每一个图像),但我已经检查并且磁盘处于全速写入,所以我不确定这方面是否有改进。确切地知道我们要保存的数据类型也有助于加快速度,特别是对于 HDF5(存储 8 位整数与存储 32 位整数不同)