如何获取盈透证券API的历史股价数据?
How to get historical stock price data from interactive brokers API?
我想使用 IBAPI 获取历史股票数据。我的代码不起作用,returns 什么都没有。有人可以帮我编辑代码吗?谢谢
from ibapi import client
from ibapi import wrapper
import datetime
from ibapi.contract import Contract
from ibapi.common import BarData
# ! [socket_init]
class App(wrapper.EWrapper,client.EClient):
def __init__(self):
wrapper.EWrapper.__init__(self)
client.EClient.__init__(self, wrapper=self)
#Build a sample contract
contract = Contract();
contract.symbol = "9005.T";
contract.secType = "STK";
contract.currency = "JPY";
contract.exchange = "SMART";
app = App()
app.connect(host='localhost',port=7497, clientId=3)
print(app.isConnected())
queryTime = (datetime.datetime.today() - datetime.timedelta(days=180)).strftime("%Y%m%d %H:%M:%S")
print(app.reqHistoricalData(4102, contract, queryTime,"1 M", "1 day", "MIDPOINT", 1, 1, False, []))
queryTime = (datetime.datetime.today() - datetime.timedelta(days=180)).strftime("%Y%m%d %H:%M:%S")
print(app.historicalData(4102,BarData))
OUTPUT:
True
None
None
我试着阅读了源代码。但我发现这对我来说很难理解。其他帖子显示的答案似乎与最新的 api 版本无关。
这是我会做的。
class App(wrapper.EWrapper,client.EClient):
如果我想重写它的任何方法,我只会继承 EClient
。我不常使用 python 但在其他语言中也是如此。
在 class App
中,您需要重写您感兴趣的方法,例如 historicalData
在 app.connect
之后,您必须调用 app.run()
来启动它的消息 reader 线程。一旦该线程取得控制权,它将阻塞您的程序,因此您必须异步执行程序流。
我会在评论中添加数字,以便您查看流程。
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract
from ibapi.common import BarData
import datetime
class MyWrapper(EWrapper):
def nextValidId(self, orderId:int):
#4 first message received is this one
print("setting nextValidOrderId: %d", orderId)
self.nextValidOrderId = orderId
#5 start requests here
self.start()
def historicalData(self, reqId:int, bar: BarData):
#7 data is received for every bar
print("HistoricalData. ReqId:", reqId, "BarData.", bar)
def historicalDataEnd(self, reqId: int, start: str, end: str):
#8 data is finished
print("HistoricalDataEnd. ReqId:", reqId, "from", start, "to", end)
#9 this is the logical end of your program
app.disconnect()
print("finished")
def error(self, reqId, errorCode, errorString):
# these messages can come anytime.
print("Error. Id: " , reqId, " Code: " , errorCode , " Msg: " , errorString)
def start(self):
queryTime = (datetime.datetime.today() - datetime.timedelta(days=180)).strftime("%Y%m%d %H:%M:%S")
fx = Contract()
fx.secType = "CASH"
fx.symbol = "USD"
fx.currency = "JPY"
fx.exchange = "IDEALPRO"
#6 request data, using fx since I don't have Japanese data
app.reqHistoricalData(4102, fx, queryTime,"1 M", "1 day", "MIDPOINT", 1, 1, False, [])
app = EClient(MyWrapper()) #1 create wrapper subclass and pass it to EClient
app.connect("127.0.0.1", 7497, clientId=123) #2 connect to TWS/IBG
app.run() #3 start message thread
我想使用 IBAPI 获取历史股票数据。我的代码不起作用,returns 什么都没有。有人可以帮我编辑代码吗?谢谢
from ibapi import client
from ibapi import wrapper
import datetime
from ibapi.contract import Contract
from ibapi.common import BarData
# ! [socket_init]
class App(wrapper.EWrapper,client.EClient):
def __init__(self):
wrapper.EWrapper.__init__(self)
client.EClient.__init__(self, wrapper=self)
#Build a sample contract
contract = Contract();
contract.symbol = "9005.T";
contract.secType = "STK";
contract.currency = "JPY";
contract.exchange = "SMART";
app = App()
app.connect(host='localhost',port=7497, clientId=3)
print(app.isConnected())
queryTime = (datetime.datetime.today() - datetime.timedelta(days=180)).strftime("%Y%m%d %H:%M:%S")
print(app.reqHistoricalData(4102, contract, queryTime,"1 M", "1 day", "MIDPOINT", 1, 1, False, []))
queryTime = (datetime.datetime.today() - datetime.timedelta(days=180)).strftime("%Y%m%d %H:%M:%S")
print(app.historicalData(4102,BarData))
OUTPUT:
True
None
None
我试着阅读了源代码。但我发现这对我来说很难理解。其他帖子显示的答案似乎与最新的 api 版本无关。
这是我会做的。
class App(wrapper.EWrapper,client.EClient):
如果我想重写它的任何方法,我只会继承 EClient
。我不常使用 python 但在其他语言中也是如此。
在 class App
中,您需要重写您感兴趣的方法,例如 historicalData
在 app.connect
之后,您必须调用 app.run()
来启动它的消息 reader 线程。一旦该线程取得控制权,它将阻塞您的程序,因此您必须异步执行程序流。
我会在评论中添加数字,以便您查看流程。
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract
from ibapi.common import BarData
import datetime
class MyWrapper(EWrapper):
def nextValidId(self, orderId:int):
#4 first message received is this one
print("setting nextValidOrderId: %d", orderId)
self.nextValidOrderId = orderId
#5 start requests here
self.start()
def historicalData(self, reqId:int, bar: BarData):
#7 data is received for every bar
print("HistoricalData. ReqId:", reqId, "BarData.", bar)
def historicalDataEnd(self, reqId: int, start: str, end: str):
#8 data is finished
print("HistoricalDataEnd. ReqId:", reqId, "from", start, "to", end)
#9 this is the logical end of your program
app.disconnect()
print("finished")
def error(self, reqId, errorCode, errorString):
# these messages can come anytime.
print("Error. Id: " , reqId, " Code: " , errorCode , " Msg: " , errorString)
def start(self):
queryTime = (datetime.datetime.today() - datetime.timedelta(days=180)).strftime("%Y%m%d %H:%M:%S")
fx = Contract()
fx.secType = "CASH"
fx.symbol = "USD"
fx.currency = "JPY"
fx.exchange = "IDEALPRO"
#6 request data, using fx since I don't have Japanese data
app.reqHistoricalData(4102, fx, queryTime,"1 M", "1 day", "MIDPOINT", 1, 1, False, [])
app = EClient(MyWrapper()) #1 create wrapper subclass and pass it to EClient
app.connect("127.0.0.1", 7497, clientId=123) #2 connect to TWS/IBG
app.run() #3 start message thread