仅在完全完成数据采集后才读取 hdf5 文件

Reading an hdf5 file only after it has completely finished acquiring data

数据将保存为hdf5文件,但保存一个文件总共需要大约30秒。一旦数据保存在一个 hdf5 文件中,该文件将立即使用,直到下一个 hdf5 文件完成,该过程将像这样继续。有没有一种简单的方法来检查 hd5 文件是否已完成加载,然后才能使用? hdf5 文件大约为 10-20MB,将全部保存在同一个文件夹中。当然,我也许可以将计时器设置为某种形式的 30 秒以上,但我有兴趣将时间保持得尽可能短,这意味着我需要确切地知道每个 hdf5 文件何时完成获取数据。

我有几个想法:

  1. 测量从一个时间点到另一个时间点的文件大小差异。如果没有变化,则认为文件已完成加载。
  2. 我不太了解 hdf5 文件,但也许每个 hdf5 文件的末尾都有一些东西,而且只有在末尾。如果是这样,我可以继续检查最后一个组件的值是否存在。如果存在,则文件必须完成。

有什么想法吗?如果有任何帮助,我将不胜感激。

编辑: 我对 hdf5 部分的想法 on_created:

class CustomHandler(FileSystemEventHandler):    

    def __init__(self, callback: Callable):
        self.callback = callback

        # Store callback to be called on every on_created event

    def on_created(self, event: Union[DirCreatedEvent, FileCreatedEvent]):
        #print(f"Event type: {event.event_type}\nAt: {event.src_path}\n")

        # check if it's File creation, not Directory creation
        if isinstance(event, FileCreatedEvent):
            file = pathlib.Path(event.src_path)

            #print(f"Processing file {file.name}\n")

            # call callback
            #self.callback(file)

            wait = 3
            max_wait = 30
            waited = 0

            while True:
                try:
                    h5py.File(self.callback(file), 'r')
                    return self.callback(file)

                except FileNotFoundError:
                    print('Error: HDF5 File not found')
                    return None

                except OSError:
                    if waited < max_wait:
                        print(f'Error: HDF5 File locked, sleeping {wait} seconds...')
                        time.sleep(wait)
                        waited += wait
                    else:
                        print(f'waited too long= {waited} secs')
                        return None

你要的是'file locking'。好消息:这在 HDF5 库构建中启用(默认情况下)。而且,更好的是,它在 h5py 包中启用!因此,如果您尝试打开一个由另一个程序打开以供写入的文件,您将得到一个异常。我们可以利用这个例外来发挥我们的优势。挑战在于将文件锁定异常与其他潜在的文件打开异常(如文件不存在)区分开来。

坦率地说,我更喜欢 Python 的 with/as: 上下文管理器来打开文件。但是,它以相同的方式处理所有异常(不打开并退出)。所以,我们需要一种方法来区别处理不同的异常。我怀疑自定义文件上下文管理器是执行此操作的最 Pythonic 方法。然而,这超出了我的专业范围。

相反,我写了一个你用文件名调用的函数它在 while 循环中使用 try/except: 打开文件。将发生以下 3 件事之一:

  1. 它 returns h5py 文件对象,如果它打开文件。
  2. 它立即returns None如果文件不存在。
  3. 如果它被锁定,它会休眠,然后重试。如果超过时间还打不开,返回None。

使用此功能时记得使用.close()方法!

代码已于 2021-09-09 更新,以使用 argparse 模块将 HDF5 文件名作为必需的命令行参数传递。
更新了以下代码:

import h5py
import argparse
import sys 
import time

def h5_open_wait(h5file):
    
    wait = 3
    max_wait = 30
    waited = 0

    while True:
        try:
            h5f = h5py.File(h5file,'r')
            return h5f
                
        except FileNotFoundError:
            print('Error: HDF5 File not found')
            return None
        
        except OSError:   
            if waited < max_wait:
                print(f'Error: HDF5 File locked, sleeping {wait} seconds...')
                time.sleep(wait) 
                waited += wait  
            else:
                print(f'waited too long= {waited} secs')
                return None

def get_job_options():

# Note that HDF5 file name is only parameter and is required; 
 
    parser = argparse.ArgumentParser(description='Check HDF5 file is available to open.')
    parser.add_argument('hdf5', help='HDF5 filename (Required)' )

    if len(sys.argv)==1:
    # display help message when no args are passed.
        parser.print_help()
        sys.exit('Error: No HDF5 file name specified; exiting.')

    args = parser.parse_args()
    
    HDF5_FILE = args.hdf5
    #print ('HDF5 file = %s' % args.hdf5)

    return (HDF5_FILE)

####################

h5file  = get_job_options()

start = time.time()

h5f = h5_open_wait(h5file)
if h5f is None:
    sys.exit('Error: HDF5 File not opened')
    
# do something with the file      
for ds, obj in h5f.items():
    print(f'ds name={ds}; shape={obj.shape}')

h5f.close()     
print(f'\nTime to read all datasets = {time.time()-start:.2f} secs')  

为了测试,我写了一个简单的程序,从一个大数组中创建 800 个数据集。 (下面的代码。)要测试,首先启动它,然后 运行 上面的代码以查看它是如何等待的。根据您的系统速度调整上面的 max_wait 和下面的 a0cnt

创建上面使用的示例文件的代码:

start = time.time()
a0 = 1000
cnt = 800
arr = np.random.random(a0*a0).reshape(a0,a0)
with h5py.File('SO_69067142.h5','w') as h5f:
    for dcnt in range(cnt):
        h5f.create_dataset(f'ds_{dcnt:03}',data=arr)

print(f'Time to create {cnt} datasets={time.time()-start:.2f}')   

根据您的评论和我们的讨论,最简单的实现可能是“等待”文件的函数,但不 return h5py 文件对象。这样您仍然可以使用标准上下文管理器:(例如,with h5py.File() as h5f:)并且避免需要在主程序中关闭文件。

我将修改后的函数发布为新答案(重命名为 h5_wait)以避免混淆(我的第一个答案具有原始函数 h5_open_wait)。此功能类似,但 return 是一个 True/False 标志而不是 h5py 文件对象。它通过调用 h5py.File() 检查文件状态,然后在退出函数之前关闭。它还使用 sys.argv 获取 HDF5 文件名(如 sys.argv[1])。

查看下面的新代码:

import h5py
import sys
import time

def h5_wait(h5file):
    
    wait = 3
    max_wait = 30
    waited = 0

    while True:
        try:
            h5f = h5py.File(h5file,'r')
            break
                
        except FileNotFoundError:
            print('\nError: HDF5 File not found\n')
            return False
        
        except OSError:   
            if waited < max_wait:
                print(f'Warning: HDF5 File locked, sleeping {wait} seconds...')
                time.sleep(wait) 
                waited += wait  
            else:
                print(f'\nWaited too long= {waited} secs, exiting...\n')
                return False

    h5f.close()
    return True

####################

if len(sys.argv) != 2:
    sys.exit('Include HDF5 file name on command line.')
h5file = sys.argv[1]         

h5stat = h5_wait(h5file)
if h5stat is False:
    sys.exit('Error: HDF5 File not available')
    
with h5py.File(h5file) as h5f:
    # do something with the file      
    start = time.time()
    for ds, obj in h5f.items():
        print(f'ds name={ds}; shape={obj.shape}')
      
    print(f'\nTime to read {len(list(h5f.keys()))} datasets = {time.time()-start:.2f} secs')