使用新列(时间序列技术指标)动态更新现有的 Postgres table (psycopg2)
Dynamically updating existing Postgres table (psycopg2) with new columns (time series technical indicators)
我正在处理股票的金融时间序列数据 (OHLCV),并使用金融指标(例如布林带)扩充我的数据集。我有一个 Postgres/TimescaleDB hypertable 设置并计划使用 psycopg2 来 运行 查询。
我想做的是能够自动和动态地向我现有的数据库 table 添加列,这些列具有与现有 python 库/模块不同的技术指标。
举个例子,我有一个很长的 table 列,其中包含每日 OHLCV 和代码数据,以便稍后进行 ETL 处理。我想要 运行 数量尚未确定的指标,例如布林带(上限和下限)、RSI、ema 等。但不想在 [= 中手动编码这些新列20=] 每次我创建一个新指标时通过 psycopg2,而是将它们作为参数输入以在数据库中自动设置 - 作为迁移/新 table,或在现有每日 table. (最好的)
我环顾四周,但还没有看到任何我认为可能对此有用的东西。
我建议不要反复修改您的 table,而是以不同的方式对数据建模。例如,您可以创建以下 tables
-- Table for the original series
CREATE TABLE series (
id INTEGER PRIMARY KEY,
datetime TIMESTAMP NOT NULL,
symbol TEXT NOT NULL,
value NUMBER NOT NULL
);
-- Table for the derived indicators
CREATE TABLE indicators (
id INTEGER PRIMARY KEY,
series_id INTEGER NOT NULL,
name TEXT NOT NULL,
value NUMBER NOT NULL,
FOREIGN KEY (series_id) REFERENCES series(id)
);
然后您可以为每个原始系列计算任意数量的指标(存储在 series
中)并将派生值存储在 indicators
中。这种方法还有一个额外的好处,即您不需要评估所有时间序列的所有指标(或者当您不评估所有数据点的所有指标时有一堆 NULL
列)。
例如1,
import numpy as np
import psycopg2
SYMBOL = "GOOG"
INDICATOR = "EMA"
with psycopg2.connect("your connection string") as conn:
# Get the data
with conn.cursor() as cur:
cur.execute("SELECT id, value FROM series WHERE symbol = %s", (SYMBOL,))
ids, values = np.transpose(cur.fetchall())
# Calculate the indicator
indicator_values = evaluate_indicator(values, INDICATOR)
# Insert the indicator values
with conn.cursor() as cur:
data = [(series_id, INDICATOR, value) for series_id, value in
zip(ids, indicator_values)]
cur.executemany("INSERT INTO indicators (series_id, name, value) VALUES (%s, %s, %s)",
data)
话虽如此,如果您想继续扩展 table.[=17 的架构,则可以在计算每个新指标之前使用 ALTER TABLE
command 创建一个新列=]
1 我还没有测试过这段代码,所以它可能包含一些小问题。
我正在处理股票的金融时间序列数据 (OHLCV),并使用金融指标(例如布林带)扩充我的数据集。我有一个 Postgres/TimescaleDB hypertable 设置并计划使用 psycopg2 来 运行 查询。
我想做的是能够自动和动态地向我现有的数据库 table 添加列,这些列具有与现有 python 库/模块不同的技术指标。
举个例子,我有一个很长的 table 列,其中包含每日 OHLCV 和代码数据,以便稍后进行 ETL 处理。我想要 运行 数量尚未确定的指标,例如布林带(上限和下限)、RSI、ema 等。但不想在 [= 中手动编码这些新列20=] 每次我创建一个新指标时通过 psycopg2,而是将它们作为参数输入以在数据库中自动设置 - 作为迁移/新 table,或在现有每日 table. (最好的)
我环顾四周,但还没有看到任何我认为可能对此有用的东西。
我建议不要反复修改您的 table,而是以不同的方式对数据建模。例如,您可以创建以下 tables
-- Table for the original series
CREATE TABLE series (
id INTEGER PRIMARY KEY,
datetime TIMESTAMP NOT NULL,
symbol TEXT NOT NULL,
value NUMBER NOT NULL
);
-- Table for the derived indicators
CREATE TABLE indicators (
id INTEGER PRIMARY KEY,
series_id INTEGER NOT NULL,
name TEXT NOT NULL,
value NUMBER NOT NULL,
FOREIGN KEY (series_id) REFERENCES series(id)
);
然后您可以为每个原始系列计算任意数量的指标(存储在 series
中)并将派生值存储在 indicators
中。这种方法还有一个额外的好处,即您不需要评估所有时间序列的所有指标(或者当您不评估所有数据点的所有指标时有一堆 NULL
列)。
例如1,
import numpy as np
import psycopg2
SYMBOL = "GOOG"
INDICATOR = "EMA"
with psycopg2.connect("your connection string") as conn:
# Get the data
with conn.cursor() as cur:
cur.execute("SELECT id, value FROM series WHERE symbol = %s", (SYMBOL,))
ids, values = np.transpose(cur.fetchall())
# Calculate the indicator
indicator_values = evaluate_indicator(values, INDICATOR)
# Insert the indicator values
with conn.cursor() as cur:
data = [(series_id, INDICATOR, value) for series_id, value in
zip(ids, indicator_values)]
cur.executemany("INSERT INTO indicators (series_id, name, value) VALUES (%s, %s, %s)",
data)
话虽如此,如果您想继续扩展 table.[=17 的架构,则可以在计算每个新指标之前使用 ALTER TABLE
command 创建一个新列=]
1 我还没有测试过这段代码,所以它可能包含一些小问题。