从 HDF5 迁移到 PostgreSQL

Moving from HDF5 to PostgreSQL

我有许多具有以下属性的大型一维 hdf5 数据集:

我想将它们移动到 PostgreSQL 中,我处理了数据库的结构和插入数据,但是每次填充 72.4mb hdf5 文件需要大约 650 秒,有人可以给我一个 tips/advice 我如何可以提高性能吗?

我现在拥有的:

def fill_database(self, dog):
    if isinstance(dog, h5py.Dataset):
        name = dog.name.split('/')
        table_name = '{}_{}'.format(name[3], name[5]) 
        data = dog.value.astype(int).tolist()
        self.cur.execute('CREATE TABLE IF NOT EXISTS {} (cur_id INT PRIMARY KEY , data INT[]);'.format(table_name))
        self.cur.execute('INSERT INTO {} VALUES (%s, %s)'.format(table_name), (name[2], data))

        if isinstance(dog, h5py.Group):
            for k, v in dict(dog).items():
                self.fill_database(v)

我尝试了什么:

import psycopg2
import h5py
from itertools import islice


with h5py.File('full_db.hdf5') as hdf5file:
    with psycopg2.connect(database='hdf5', user='postgres', password='pass', port=5432) as conn:
        cur = conn.cursor()
        cur.execute('drop table if EXISTS mytable;')
        cur.execute('create table mytable (data INT[]);')
        chunksize = 10000
        t = iter(hdf5file["Group"]["1"]["Group1"]["2"]["Dataset"][:].astype(int))
        rows = islice(t, chunksize)
        while rows:
            statement = "INSERT INTO mytable(data) VALUES {}".format(rows) # I stuck here
            cur.execute(statement)
            rows = islice(t, chunksize)
        conn.commit()

我还尝试在 PostgreSQL 中使用 LIMIT 和许多其他方式做一些事情,但我没有成功。

我觉得部分问题可能是因为数据库中的数组,为了以后输出更方便,我用到了

差不多两周后,我想我可以回答我自己的问题了。 为了寻找答案,我在互联网上看到了这个页面 https://github.com/psycopg/psycopg2/issues/179 同样在阅读文档之后,我了解到从文件中复制的速度更快,因此我尝试使用 StringIO 模块。这就是我得到的:

import h5py
import psycopg2
import time
from io import StringIO

conn = psycopg2.connect(database='hdf5', user='postgres', password=' ')
cur = conn.cursor()

file = h5py.File('db.hdf5', 'r')
data_set = file['path/to/large/data_set'].value.astype(int).tolist()

cur.execute('DROP TABLE IF EXISTS table_test;')
cur.execute('CREATE TABLE table_test (data INTEGER[]);')

# ORIGINAL
start = time.time()
cur.execute('INSERT INTO table_test VALUES (%s);', (data_set,))
print('Original: {} sec'.format(round(time.time() - start, 2)))

# STRING FORMAT
start = time.time()
data_str = ','.join(map(str, data_set)).replace('[', '{').replace(']', '}')
cur.execute('INSERT INTO table_test VALUES (ARRAY[{}]);'.format(data_str))
print('String format: {} sec'.format(round(time.time() - start, 2)))

# STRING IO COPY
start = time.time()
data_str = ','.join(map(str, data_set)).replace('[', '{').replace(']', '}')
data_io = StringIO('{{{}}}'.format(data_str))
cur.copy_from(data_io, 'table_test')
print('String IO: {} sec'.format(round(time.time() - start, 2)))

conn.commit()

这给了我下一个结果,数据集的形状为 (1200201,):

Original: 1.27 sec
String format: 0.58 sec
String IO: 0.3 sec