从代码数据生成 CandleSticks
Generate CandleSticks from ticker data
我有一个算法可以将股票行情数据转换为 CandleSticks。
我有一个多次调用此函数的代码,我正在尝试优化该函数以使其 运行 更快。
所以我想让你阅读代码并给我建议如何让它更快
为了这个问题,您可以将市场报价器视为两个列表。
某只股票的价格列表
stock_price = [ 5, 5.1, 5, 4.9 , ... ]
以及与每个价格相关的时间戳列表。
timestamps = [ 1534339504.36133 , 1534339704.36133, 1534339804.36133, 1534340504.36133, ... ]
您会注意到采样率是可变的,有时可能是几秒钟,有时可能是几分钟。输入列表按递增时间戳排序。
所以我给出了 N 个我想要计算的蜡烛。每支持续时间 T 的蜡烛。如果我要求 10 支持续时间为 5 分钟的蜡烛,但我没有足够的时间戳,第一支蜡烛将是 NAN。另一方面,如果我有大量过去几周的时间戳,则只会考虑最后的样本来计算最后 10 根蜡烛,其余的将被忽略。
还有一个细节。我以稍微不同的方式计算蜡烛。
通常,它们参考 UTC,我认为列表中的最后一个元素是我最后一根蜡烛的收盘价和时间
最后,我需要包含蜡烛的开盘价、最高价、最低价、收盘价以及时间间隔为 T
的 N 根蜡烛的时间的列表或 numpy 数组
因此,为了将这两个列表转换为蜡烛图,我执行以下操作
# time_interval is the size of the candle: 1, 5, 10... minutes, hours, etc
# nb_candles is the number of candles that I want to extract ( for example the last 5 candles )
def convert_samples_to_candles( stock_price , times , time_interval , nb_candles=-1 ):
#If no data return NaNs
if( len(stock_price) == 0 or len(times) == 0 ):
NO_RESPONSE = [np.NaN]
return NO_RESPONSE, NO_RESPONSE, NO_RESPONSE, NO_RESPONSE, NO_RESPONSE
last_time = times[-1]
last_val = stock_price[-1]
#if nb_candles is not specified compute all the candles
if( nb_candles==-1 ):
nb_candles = int((last_time - times[0])/time_interval) + 1
candles_open = [np.NaN]*nb_candles
candles_close = [np.NaN]*nb_candles
candles_high = [np.NaN]*nb_candles
candles_low = [np.NaN]*nb_candles
candles_time = [np.NaN]*nb_candles
k=1
last_candle = -1
#Initialize the last candles with the last value
candles_open[-1] = last_val
candles_close[-1] = last_val
candles_high[-1] = last_val
candles_low[-1] = last_val
candles_time[-1] = last_time
#Iterate and fill each candle from the last one to the first one
nb_times = len(times)
while( k < nb_times and times[-1*k] + nb_candles*time_interval > last_time ):
a_last = stock_price[-1*k]
a_timestamp = times[-1*k]
candle_index = (-1*int((last_time - a_timestamp)/time_interval) -1)
if( candle_index > -1 ):
k += 1
continue
if( candle_index < last_candle ):
candles_time[ candle_index ] = a_timestamp
candles_close[ candle_index ] = a_last
candles_high[ candle_index ] = a_last
candles_low[ candle_index ] = a_last
candles_open[ candle_index ] = a_last
last_candle = candle_index
else:
#print candle_index, candles_open
candles_open[ candle_index ] = a_last
if( candles_high[ candle_index ] < a_last ):
candles_high[ candle_index ] = a_last
if( candles_low[ candle_index ] > a_last ):
candles_low[ candle_index ] = a_last
k += 1
return candles_open, candles_close, candles_high, candles_low, candles_time
非常感谢您的宝贵时间!
因此,经过一些研究后,我尝试提供一种不同的方法来计算蜡烛图。
我定义了一个 Candle_Handler class 并且我迭代地插入样本,并更新蜡烛图。
当您迭代地重新计算蜡烛图时,此代码比问题中的代码稍快。
class Candle_Handler( ):
def __init__(self, time_interval, nb_candles=5 ):
self.nb_candles = nb_candles
self.time_interval = time_interval
self.times = []
self.values = []
self.candles_t = [ [] for _ in range(nb_candles) ]
self.candles_v = [ [] for _ in range(nb_candles) ]
def insert_sample( self, value, time ):
self.candles_t[-1].append(time)
self.candles_v[-1].append(value)
for i in range( self.nb_candles ):
candle_index = -1*(i+1)
if( len(self.candles_t[candle_index]) == 0 ): continue
candle_time_interval = (i+1)*self.time_interval
if( i + 1 == self.nb_candles ):
while( len(self.candles_t[candle_index])> 0 and time - self.candles_t[candle_index][0] > candle_time_interval ):
del self.candles_t[candle_index][0]
del self.candles_v[candle_index][0]
else:
while( len(self.candles_t[candle_index])> 0 and time - self.candles_t[candle_index][0] > candle_time_interval ):
ltime = self.candles_t[candle_index].pop(0)
lvalue = self.candles_v[candle_index].pop(0)
self.candles_t[candle_index-1].append( ltime )
self.candles_v[candle_index-1].append( lvalue )
def get_all_candles(self, delta=1.0 ):
last_time = self.candles_t[-1][-1]
candles_open = [ c[0] if len(c)>0 else np.NAN for c in self.candles_v ]
candles_close = [ c[-1] if len(c)>0 else np.NAN for c in self.candles_v ]
candles_high = [ max(c) if len(c)>0 else np.NAN for c in self.candles_v ]
candles_low = [ min(c) if len(c)>0 else np.NAN for c in self.candles_v ]
#candles_time = [ c[-1] if len(c)>0 else np.NAN for c in self.candles_t ]
candles_time = [ last_time - (self.nb_candles - (c+1) )*self.time_interval for c in range(self.nb_candles) ]
for i in range( 1, self.nb_candles ):
if( np.isnan( candles_close[i-1] ) ): continue
if( np.isnan( candles_open[i] ) ):
candles_open[i] = candles_close[i-1]
candles_close[i] = candles_close[i-1]
candles_high[i] = candles_close[i-1]
candles_low[i] = candles_close[i-1]
if( not delta == 1.0 ):
candles_close[-1] = candles_close[-1]*delta
if( candles_high[-1] < candles_close[-1] ):
candles_high[-1] = candles_close[-1]
if( candles_low[-1] > candles_close[-1] ):
candles_low[-1] = candles_close[-1]
if( len(self.candles_v[-1]) == 1 ):
candles_open[-1] = candles_close[-1]
return candles_open, candles_close, candles_high, candles_low, candles_time
我有一个算法可以将股票行情数据转换为 CandleSticks。 我有一个多次调用此函数的代码,我正在尝试优化该函数以使其 运行 更快。 所以我想让你阅读代码并给我建议如何让它更快
为了这个问题,您可以将市场报价器视为两个列表。 某只股票的价格列表
stock_price = [ 5, 5.1, 5, 4.9 , ... ]
以及与每个价格相关的时间戳列表。
timestamps = [ 1534339504.36133 , 1534339704.36133, 1534339804.36133, 1534340504.36133, ... ]
您会注意到采样率是可变的,有时可能是几秒钟,有时可能是几分钟。输入列表按递增时间戳排序。
所以我给出了 N 个我想要计算的蜡烛。每支持续时间 T 的蜡烛。如果我要求 10 支持续时间为 5 分钟的蜡烛,但我没有足够的时间戳,第一支蜡烛将是 NAN。另一方面,如果我有大量过去几周的时间戳,则只会考虑最后的样本来计算最后 10 根蜡烛,其余的将被忽略。
还有一个细节。我以稍微不同的方式计算蜡烛。 通常,它们参考 UTC,我认为列表中的最后一个元素是我最后一根蜡烛的收盘价和时间
最后,我需要包含蜡烛的开盘价、最高价、最低价、收盘价以及时间间隔为 T
的 N 根蜡烛的时间的列表或 numpy 数组因此,为了将这两个列表转换为蜡烛图,我执行以下操作
# time_interval is the size of the candle: 1, 5, 10... minutes, hours, etc
# nb_candles is the number of candles that I want to extract ( for example the last 5 candles )
def convert_samples_to_candles( stock_price , times , time_interval , nb_candles=-1 ):
#If no data return NaNs
if( len(stock_price) == 0 or len(times) == 0 ):
NO_RESPONSE = [np.NaN]
return NO_RESPONSE, NO_RESPONSE, NO_RESPONSE, NO_RESPONSE, NO_RESPONSE
last_time = times[-1]
last_val = stock_price[-1]
#if nb_candles is not specified compute all the candles
if( nb_candles==-1 ):
nb_candles = int((last_time - times[0])/time_interval) + 1
candles_open = [np.NaN]*nb_candles
candles_close = [np.NaN]*nb_candles
candles_high = [np.NaN]*nb_candles
candles_low = [np.NaN]*nb_candles
candles_time = [np.NaN]*nb_candles
k=1
last_candle = -1
#Initialize the last candles with the last value
candles_open[-1] = last_val
candles_close[-1] = last_val
candles_high[-1] = last_val
candles_low[-1] = last_val
candles_time[-1] = last_time
#Iterate and fill each candle from the last one to the first one
nb_times = len(times)
while( k < nb_times and times[-1*k] + nb_candles*time_interval > last_time ):
a_last = stock_price[-1*k]
a_timestamp = times[-1*k]
candle_index = (-1*int((last_time - a_timestamp)/time_interval) -1)
if( candle_index > -1 ):
k += 1
continue
if( candle_index < last_candle ):
candles_time[ candle_index ] = a_timestamp
candles_close[ candle_index ] = a_last
candles_high[ candle_index ] = a_last
candles_low[ candle_index ] = a_last
candles_open[ candle_index ] = a_last
last_candle = candle_index
else:
#print candle_index, candles_open
candles_open[ candle_index ] = a_last
if( candles_high[ candle_index ] < a_last ):
candles_high[ candle_index ] = a_last
if( candles_low[ candle_index ] > a_last ):
candles_low[ candle_index ] = a_last
k += 1
return candles_open, candles_close, candles_high, candles_low, candles_time
非常感谢您的宝贵时间!
因此,经过一些研究后,我尝试提供一种不同的方法来计算蜡烛图。
我定义了一个 Candle_Handler class 并且我迭代地插入样本,并更新蜡烛图。
当您迭代地重新计算蜡烛图时,此代码比问题中的代码稍快。
class Candle_Handler( ):
def __init__(self, time_interval, nb_candles=5 ):
self.nb_candles = nb_candles
self.time_interval = time_interval
self.times = []
self.values = []
self.candles_t = [ [] for _ in range(nb_candles) ]
self.candles_v = [ [] for _ in range(nb_candles) ]
def insert_sample( self, value, time ):
self.candles_t[-1].append(time)
self.candles_v[-1].append(value)
for i in range( self.nb_candles ):
candle_index = -1*(i+1)
if( len(self.candles_t[candle_index]) == 0 ): continue
candle_time_interval = (i+1)*self.time_interval
if( i + 1 == self.nb_candles ):
while( len(self.candles_t[candle_index])> 0 and time - self.candles_t[candle_index][0] > candle_time_interval ):
del self.candles_t[candle_index][0]
del self.candles_v[candle_index][0]
else:
while( len(self.candles_t[candle_index])> 0 and time - self.candles_t[candle_index][0] > candle_time_interval ):
ltime = self.candles_t[candle_index].pop(0)
lvalue = self.candles_v[candle_index].pop(0)
self.candles_t[candle_index-1].append( ltime )
self.candles_v[candle_index-1].append( lvalue )
def get_all_candles(self, delta=1.0 ):
last_time = self.candles_t[-1][-1]
candles_open = [ c[0] if len(c)>0 else np.NAN for c in self.candles_v ]
candles_close = [ c[-1] if len(c)>0 else np.NAN for c in self.candles_v ]
candles_high = [ max(c) if len(c)>0 else np.NAN for c in self.candles_v ]
candles_low = [ min(c) if len(c)>0 else np.NAN for c in self.candles_v ]
#candles_time = [ c[-1] if len(c)>0 else np.NAN for c in self.candles_t ]
candles_time = [ last_time - (self.nb_candles - (c+1) )*self.time_interval for c in range(self.nb_candles) ]
for i in range( 1, self.nb_candles ):
if( np.isnan( candles_close[i-1] ) ): continue
if( np.isnan( candles_open[i] ) ):
candles_open[i] = candles_close[i-1]
candles_close[i] = candles_close[i-1]
candles_high[i] = candles_close[i-1]
candles_low[i] = candles_close[i-1]
if( not delta == 1.0 ):
candles_close[-1] = candles_close[-1]*delta
if( candles_high[-1] < candles_close[-1] ):
candles_high[-1] = candles_close[-1]
if( candles_low[-1] > candles_close[-1] ):
candles_low[-1] = candles_close[-1]
if( len(self.candles_v[-1]) == 1 ):
candles_open[-1] = candles_close[-1]
return candles_open, candles_close, candles_high, candles_low, candles_time