如何在 python 上获取 ibapi 中的一系列请求

how to get series of requests in ibapi on python

我是 Python 和 ibapi 的新手,我问过 Interactive Broker 本身,他们只告诉我 :

The self.done is originally defined in the parent class EClient that is used to indicate the current API connection is "done". You should not use it as an indicator to stop a loop that is build by yourself. You can keep one API client connection live until you have finished downloaded all the historical data you need. If you need to slow down the pace in requesting data, you can use other thread control in python, such as the sleep() function, to add more wait time in your loop so it does not send historical data requests for all contracts at once.

所以我认为错误来自

self.done = True

这是我的代码

from ibapi import wrapper
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract as IBcontract
from threading import Thread
import queue
import datetime
from ibapi.utils import iswrapper #just for decorator
from ibapi.common import *
from ibapi.contract import *
from ibapi.ticktype import *

class TestApp(wrapper.EWrapper, EClient):

def __init__(self):
    wrapper.EWrapper.__init__(self)
    EClient.__init__(self, wrapper=self)


@iswrapper
def historicalData(self, reqId:int, bar: BarData):
    print("HistoricalData. ", reqId, " Date:", bar.date, "Open:", bar.open,
          "High:", bar.high, "Low:", bar.low, "Close:", bar.close, "Volume:", bar.volume,
          "Count:", bar.barCount, "WAP:", bar.average)
    if  all_data.iloc[reqId,7] == 0:
        all_data.iloc[reqId,7] = bar.close
    self.done = True

@iswrapper
def historicalDataEnd(self, reqId: int, start: str, end: str):
    super().historicalDataEnd(reqId, start, end)
    print("HistoricalDataEnd ", reqId, "from", start, "to", end)

@iswrapper
def historicalDataUpdate(self, reqId: int, bar: BarData):
    print("HistoricalDataUpdate. ", reqId, " Date:", bar.date, "Open:", bar.open,
          "High:", bar.high, "Low:", bar.low, "Close:", bar.close, "Volume:", bar.volume,
          "Count:", bar.barCount, "WAP:", bar.average)

def main():
    t = time()
    max_amount_per_Iter = 70 #max number per iter to save cost
    max_Iter = ceil(len(all_data)/max_amount_per_Iter)
    for i in range (0,max_Iter):
        print('====================round : ',i+1,'===============================')
        app = TestApp()
        app.connect("127.0.0.1", 7496, clientId=i)
        print("serverVersion:%s connectionTime:%s" % (app.serverVersion(),app.twsConnectionTime()))
        for j in range (0,min(max_amount_per_Iter,len(all_data)-i*max_amount_per_Iter)):
            print(j+i*70)
        app.i = j+i*max_amount_per_Iter

        contract = Contract()
        contract.symbol = all_data.iloc[app.i,0]
        contract.secType = all_data.iloc[app.i,1]
        contract.currency = all_data.iloc[app.i,3]
        contract.exchange = all_data.iloc[app.i,2]            

        app.reqHistoricalData(app.i, contract, "","1 W", "1 day", "Adjusted_Last", 1, 1, False, []) 

        app.run()

    sleep(1)
    app.disconnect()
    sleep(0.02)
    print('=========End round : ',i+1,'with time :',time() - t,'==============')

if __name__ == "__main__":
    main()

我希望看到如何在不使用 self.done 或断开我的 TWS 的情况下传递到下一个迭代的建议。如果我不包含 self.done 程序将在第一个 i=0,j=0 迭代时无限循环 运行,没有人告诉它退出。

所以如果你不知道能解决我的直接答案,你可以建议:

  1. 在这种情况下是否可以使用其他流量控制?

  2. 或者 cancelHistoricalData 命令,是否可以用于这种情况?

  3. 或者有什么方法可以在 Class 中构建 Class 并仅在子类中执行 self.done = True 而不会在 [=48 中断开我的 ID =]?

PS。我对 requestMarketData

有同样的问题

更新V2 这是我尝试为

打印的一些结果
====================round :  1 ===============================
step2
serverVersion:124 connectionTime:b'20170821 22:34:09 ICT'
0
ERROR:root:ERROR -1 2104 Market data farm connection is OK:hfarm
ERROR:root:ERROR -1 2104 Market data farm connection is OK:jfarm
ERROR:root:ERROR -1 2104 Market data farm connection is OK:usfuture
ERROR:root:ERROR -1 2104 Market data farm connection is OK:eufarm
ERROR:root:ERROR -1 2104 Market data farm connection is OK:cashfarm
ERROR:root:ERROR -1 2104 Market data farm connection is OK:usfarm.us
ERROR:root:ERROR -1 2104 Market data farm connection is OK:usfarm
ERROR:root:ERROR -1 2106 HMDS data farm connection is OK:ilhmds
ERROR:root:ERROR -1 2106 HMDS data farm connection is OK:euhmds
ERROR:root:ERROR -1 2106 HMDS data farm connection is OK:fundfarm
ERROR:root:ERROR -1 2106 HMDS data farm connection is OK:ushmds
HistoricalData.  0  Date: 20170815 Open: 67.2 High: 68.24 Low: 66.99 Close: 68.02 Volume: 13268 Count: 9453 WAP: 67.8105
HistoricalData.  0  Date: 20170816 Open: 68.28 High: 68.91 Low: 67.45 Close: 68.01 Volume: 11950 Count: 9723 WAP: 68.457
HistoricalData.  0  Date: 20170817 Open: 67.8 High: 68.53 Low: 66.83 Close: 66.89 Volume: 11407 Count: 9432 WAP: 67.504
HistoricalData.  0  Date: 20170818 Open: 66.91 High: 67.25 Low: 66.57 Close: 66.78 Volume: 12091 Count: 9637 WAP: 66.8445
HistoricalData.  0  Date: 20170821 Open: 66.9 High: 66.96 Low: 66.14 Close: 66.28 Volume: 3317 Count: 2541 WAP: 66.3425
HistoricalDataEnd  0 from 20170814  22:34:14 to 20170821  22:34:14
ERROR:root:ERROR 1 504 Not connected
ERROR:root:ERROR 2 504 Not connected
ERROR:root:ERROR 3 504 Not connected
ERROR:root:ERROR 4 504 Not connected
ERROR:root:ERROR 5 504 Not connected
ERROR:root:ERROR 6 504 Not connected
ERROR:root:ERROR 7 504 Not connected
ERROR:root:ERROR 8 504 Not connected
ERROR:root:ERROR 9 504 Not connected
ERROR:root:ERROR 10 504 Not connected
ERROR:root:ERROR 11 504 Not connected
ERROR:root:ERROR 12 504 Not connected
ERROR:root:ERROR 13 504 Not connected
ERROR:root:ERROR 14 504 Not connected
ERROR:root:ERROR 15 504 Not connected
ERROR:root:ERROR 16 504 Not connected
ERROR:root:ERROR 17 504 Not connected
ERROR:root:ERROR 18 504 Not connected
ERROR:root:ERROR 19 504 Not connected
ERROR:root:ERROR 20 504 Not connected
ERROR:root:ERROR 21 504 Not connected
ERROR:root:ERROR 22 504 Not connected
ERROR:root:ERROR 23 504 Not connected
ERROR:root:ERROR 24 504 Not connected
ERROR:root:ERROR 25 504 Not connected
ERROR:root:ERROR 26 504 Not connected
ERROR:root:ERROR 27 504 Not connected
ERROR:root:ERROR 28 504 Not connected
ERROR:root:ERROR 29 504 Not connected
ERROR:root:ERROR 30 504 Not connected
ERROR:root:ERROR 31 504 Not connected
ERROR:root:ERROR 32 504 Not connected
ERROR:root:ERROR 33 504 Not connected
ERROR:root:ERROR 34 504 Not connected
ERROR:root:ERROR 35 504 Not connected
ERROR:root:ERROR 36 504 Not connected
ERROR:root:ERROR 37 504 Not connected
ERROR:root:ERROR 38 504 Not connected
ERROR:root:ERROR 39 504 Not connected
ERROR:root:ERROR 40 504 Not connected
ERROR:root:ERROR 41 504 Not connected
ERROR:root:ERROR 42 504 Not connected
ERROR:root:ERROR 43 504 Not connected
ERROR:root:ERROR 44 504 Not connected
ERROR:root:ERROR 45 504 Not connected
ERROR:root:ERROR 46 504 Not connected
ERROR:root:ERROR 47 504 Not connected
ERROR:root:ERROR 48 504 Not connected
ERROR:root:ERROR 49 504 Not connected
ERROR:root:ERROR 50 504 Not connected
ERROR:root:ERROR 51 504 Not connected
ERROR:root:ERROR 52 504 Not connected
ERROR:root:ERROR 53 504 Not connected
ERROR:root:ERROR 54 504 Not connected
ERROR:root:ERROR 55 504 Not connected
ERROR:root:ERROR 56 504 Not connected
ERROR:root:ERROR 57 504 Not connected
ERROR:root:ERROR 58 504 Not connected
ERROR:root:ERROR 59 504 Not connected
ERROR:root:ERROR 60 504 Not connected
ERROR:root:ERROR 61 504 Not connected
ERROR:root:ERROR 62 504 Not connected
ERROR:root:ERROR 63 504 Not connected
ERROR:root:ERROR 64 504 Not connected
ERROR:root:ERROR 65 504 Not connected
ERROR:root:ERROR 66 504 Not connected
ERROR:root:ERROR 67 504 Not connected
ERROR:root:ERROR 68 504 Not connected
ERROR:root:ERROR 69 504 Not connected
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
=========End round :  1 with time : 7.807971477508545 ==============
====================round :  2 ===============================
step2
ERROR:root:ERROR -1 2104 Market data farm connection is OK:hfarm
ERROR:root:ERROR -1 2104 Market data farm connection is OK:jfarm
ERROR:root:ERROR -1 2104 Market data farm connection is OK:usfuture
ERROR:root:ERROR -1 2104 Market data farm connection is OK:eufarm
ERROR:root:ERROR -1 2104 Market data farm connection is OK:cashfarm
ERROR:root:ERROR -1 2104 Market data farm connection is OK:usfarm.us
ERROR:root:ERROR -1 2104 Market data farm connection is OK:usfarm
ERROR:root:ERROR -1 2106 HMDS data farm connection is OK:ilhmds
ERROR:root:ERROR -1 2106 HMDS data farm connection is OK:euhmds
ERROR:root:ERROR -1 2106 HMDS data farm connection is OK:fundfarm
ERROR:root:ERROR -1 2106 HMDS data farm connection is OK:ushmds
serverVersion:124 connectionTime:b'20170821 22:34:17 ICT'
70
HistoricalData.  70  Date: 20170815 Open: 117.23 High: 117.62 Low: 116.58 Close: 117.43 Volume: 7232 Count: 6205 WAP: 117.1295
HistoricalData.  70  Date: 20170816 Open: 117.49 High: 119.59 Low: 117.03 Close: 119.25 Volume: 16468 Count: 11498 WAP: 118.8035
HistoricalData.  70  Date: 20170817 Open: 119.19 High: 119.48 Low: 116.46 Close: 116.47 Volume: 12285 Count: 10072 WAP: 117.4645
HistoricalData.  70  Date: 20170818 Open: 116.0 High: 117.84 Low: 115.46 Close: 116.88 Volume: 14917 Count: 10824 WAP: 116.9795
HistoricalData.  70  Date: 20170821 Open: 116.85 High: 117.4 Low: 116.15 Close: 116.77 Volume: 3471 Count: 2599 WAP: 116.5535
HistoricalDataEnd  70 from 20170814  22:34:19 to 20170821  22:34:19
ERROR:root:ERROR 71 504 Not connected
ERROR:root:ERROR 72 504 Not connected

您需要使用另一个标志来终止历史数据获取,而不是 self.done,正如 IB 指出的那样 "reserved"。

如果 print x.__dict__ 其中 xTestApp 的一个实例,您将看到基础 类 已经使用的内容,而不应该已覆盖,除非您对此处显示的方法所做的那样。

此外,如果您想存储结果,而不仅仅是打印它们;最好像我在这里一样使用队列:

https://gist.github.com/robcarver17/f50aeebc2ecd084f818706d9f05c1eb4

我建议在这里进行一些重新设计。

在您的示例中,您在获得第一个数据后立即使用 self.done=True 断开与 API 的连接。由于请求的数据在非常快速的事件中到达,因此您可以在执行此操作之前获取请求的所有数据。在你的输出中看到的结果:你得到并处理了第一个请求的所有数据,然后你发现自己断开了连接。重复 for i...

  1. 删除 'self.done=True' 行。这一点都不好,你不想要那个。

  2. 重构您的代码,使其在将下一个请求发送到 API 之前等待 historicalDataEnd 事件。 API 可以处理并行请求,但是服务器不能容忍非常频繁的历史数据请求。如果您发送请求的速度太快,您将收到节奏违规错误。请求之间的一些 sleep() 也可能是必要的。

  3. 我假设 app.run() 等待断开连接,这就是您需要 'done' 标志操作的原因。也删除该行。如果确实需要此调用来捕获消息(我不知道您的 TestApp class 及其基础),那么请确保在所有请求的数据到达并处理后调用 disconnect() ,实际上是在 historicalDataEnd() 中.在这种情况下,在下一个循环之前需要重新连接,现在在 app.run() 调用 for j...

  4. 之后丢失了

以下是我建议的修改:

from ibapi import wrapper
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract as IBcontract
from threading import Thread
import queue
import datetime
from ibapi.utils import iswrapper #just for decorator
from ibapi.common import *
from ibapi.contract import *
from ibapi.ticktype import *

class TestApp(wrapper.EWrapper, EClient):

def __init__(self):
    wrapper.EWrapper.__init__(self)
    EClient.__init__(self, wrapper=self)


@iswrapper
def historicalData(self, reqId:int, bar: BarData):
    print("HistoricalData. ", reqId, " Date:", bar.date, "Open:", bar.open,
          "High:", bar.high, "Low:", bar.low, "Close:", bar.close, "Volume:", bar.volume,
          "Count:", bar.barCount, "WAP:", bar.average)
    if  all_data.iloc[reqId,7] == 0:
        all_data.iloc[reqId,7] = bar.close
    # the line below is not necessary
    #self.done = True

@iswrapper
def historicalDataEnd(self, reqId: int, start: str, end: str):
    super().historicalDataEnd(reqId, start, end)
    print("HistoricalDataEnd ", reqId, "from", start, "to", end)
    print('=========End round : ',app.i+1,'with time :',time() - app.t,'==============')

    app.i++
    if app.i==len(all_data):
        app.disconnect() # Disconnect here: get out of app.run() when all data processed
    else:
        # add some sleep here if necessary
        reqNext()


@iswrapper
def historicalDataUpdate(self, reqId: int, bar: BarData):
    print("HistoricalDataUpdate. ", reqId, " Date:", bar.date, "Open:", bar.open,
          "High:", bar.high, "Low:", bar.low, "Close:", bar.close, "Volume:", bar.volume,
          "Count:", bar.barCount, "WAP:", bar.average)


def reqNext():
    print('====================round : ',app.i+1,'===============================')
    contract = Contract()
    contract.symbol = all_data.iloc[app.i,0]
    contract.secType = all_data.iloc[app.i,1]
    contract.currency = all_data.iloc[app.i,3]
    contract.exchange = all_data.iloc[app.i,2]
    app.reqHistoricalData(app.i, contract, "","1 W", "1 day", "Adjusted_Last", 1, 1, False, []) 


def main():
    app = TestApp()
    app.connect("127.0.0.1", 7496, clientId=1234)
    print("serverVersion:%s connectionTime:%s" % (app.serverVersion(),app.twsConnectionTime()))

    app.i = 0
    app.t = time()
    reqNext()
    app.run()
    # when we arrive here, app is disconnected already


if __name__ == "__main__":
    main()

非常感谢 Janos,他真的帮助了我,我稍微改变了一下,它成功了!

谢谢你的想法我会改进这个

因为你的代码有一些错误,所以我让它可以运行。

from ibapi import wrapper
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract as IBcontract
from threading import Thread
import queue
import datetime
from ibapi.utils import iswrapper #just for decorator
from ibapi.common import *
from ibapi.contract import *
from ibapi.ticktype import *

class TestApp(wrapper.EWrapper, EClient):

    def __init__(self):
        wrapper.EWrapper.__init__(self)
        EClient.__init__(self, wrapper=self)


    @iswrapper
    def historicalData(self, reqId:int, bar: BarData):
        print("HistoricalData. ", reqId, " Date:", bar.date, "Open:", bar.open,
              "High:", bar.high, "Low:", bar.low, "Close:", bar.close, "Volume:", bar.volume,
          "Count:", bar.barCount, "WAP:", bar.average)
        if  all_data.iloc[reqId,7] == 0:
            all_data.iloc[reqId,7] = bar.close
    # the line below is not necessary
    #self.done = True

    @iswrapper
    def historicalDataEnd(self, reqId: int, start: str, end: str):
        super().historicalDataEnd(reqId, start, end)
        print("HistoricalDataEnd ", reqId, "from", start, "to", end)
        print('=========End round : ',self.i+1,'with time :',time() - self.t,'==============')

        self.i = self.i+1
        if self.i==len(all_data):
            self.disconnect() # Disconnect here: get out of app.run() when all data processed
        else:
        # add some sleep here if necessary
            reqNext(self.i,self)


    @iswrapper
    def historicalDataUpdate(self, reqId: int, bar: BarData):
        print("HistoricalDataUpdate. ", reqId, " Date:", bar.date, "Open:", bar.open,
              "High:", bar.high, "Low:", bar.low, "Close:", bar.close, "Volume:", bar.volume,
              "Count:", bar.barCount, "WAP:", bar.average)


def reqNext(i,app):
        print('====================round : ',i+1,'===============================')
        contract = Contract()
        contract.symbol = all_data.iloc[i,0]
        contract.secType = all_data.iloc[i,1]
        contract.currency = all_data.iloc[i,3]
        contract.exchange = all_data.iloc[i,2]
        app.reqHistoricalData(i, contract, "","1 W", "1 day", "Adjusted_Last", 1, 1, False, []) 


def main():
    app = TestApp()
    app.connect("127.0.0.1", 7496, clientId=1234)
    print("serverVersion:%s connectionTime:%s" % (app.serverVersion(),app.twsConnectionTime()))
    app.i = 0
    app.t = time()
    reqNext(app.i,app)
    app.run()
    # when we arrive here, app is disconnected already


if __name__ == "__main__":
    main()