ibpy 获取投资组合信息:Interactive Broker,Python
ibpy Getting portfolio information: Interactive Broker, Python
我已成功编写代码,使用以下代码从交易平台的演示版中提取有关我的头寸的信息:
tws_conn = conn.Connection.create(port=7497, clientId=100)
tws_conn.register( acct_update, msg.updateAccountValue,
msg.updateAccountTime, msg.updatePortfolio)
tws_conn.connect()
tws_conn.reqPositions()
tws_conn.reqAccountUpdates(True,'DU15181')
但是,它将信息转储为:
<updatePortfolio contract=<Packages.IbPy.ib.ext.Contract.Contract object at 0x06B0FE30>, position=-10, marketPrice=3.4000001, marketValue=-3400.0, averageCost=334.345, unrealizedPNL=-56.55, realizedPNL=0.0, accountName=DU15181>
我想知道如何将上述信息存储到一个数组中,该数组包含用于合约或股票代码的列、投资组合中的数量和不同列中的购买价格
"columns" 要求有些模糊,如果列表中的每个条目本身就是一个列表(并且子列表中的每个索引位置始终包含相同的字段),则列表已经满足要求
您收到的消息发送到您在tws注册的回调。每个 "dumped" 字段都可以使用 "dot" 符号或通过 "keys"、"values" 和 "items"
等 dict-like 方法访问
主要的挑战是合约:IB 允许访问大量交易所和不同的交易资产。 "contract" 对象(以及 IB 后端中的信息)似乎反映了提供对所有内容的 uniform/unified 访问的努力,同时表明他们必须建立在现有基础设施之上。
你提到了 "stock ticker" 所以我猜你可能会对类似这样的东西感到满意: IBM-STK-SMART 使用 "ib" 行业标准符号(第二个字段表示它是股票并且第三个是 IB 将使用 SMART 路由进行订单和价格更新)
让我们来看看列表列表:
def acct_update(self, msg):
# Assume the function is a method in a class with access to a member
# 'portfolio' which is a list
if isinstance(msg, ib.opt.message.updatePortfolio):
c = msg.contract
ticker = '%s-%s-%s' % (contract.m_symbol, c.m_secType, c.m_exchange)
entry = [ticker]
entry.extend(msg.values)
self.portfolio.append(entry)
不幸的是,ibpy
消息中的 "keys" 方法不是 classmethod
,但名称实际上是 __slots__
。在持有 acct_update
方法的 class 中,您可以执行以下操作:
class MyClass(object):
portfields = ['ticker'] + ib.opt.message.updatePortfolio.__slots__
如果不是通过索引访问列表中的字段,您更喜欢 ibpy 已经提供的字段名称,您也可以制作一个字典的字典
def __init__(self, ...)
self.portfolio = collections.OrderedDict()
def acct_update(self, msg):
# Assume the function is a method in a class with access to a member
# 'portfolio' which is a list
if isinstance(msg, ib.opt.message.updatePortfolio):
c = msg.contract
ticker = '%s-%s-%s' % (contract.m_symbol, c.m_secType, c.m_exchange)
self.portfolio[ticker] = collections.OrderedDict(msg.items())
这将允许您按名称获取最新的代码信息并按名称访问字段。
如果您需要保留每个代码的历史记录
def __init__(self, ...)
self.portfolio = collections.defaultdict(list)
def acct_update(self, msg):
# Assume the function is a method in a class with access to a member
# 'portfolio' which is a list
if isinstance(msg, ib.opt.message.updatePortfolio):
c = msg.contract
ticker = '%s-%s-%s' % (contract.m_symbol, c.m_secType, c.m_exchange)
self.portfolio[ticker].extend(msg.values())
您可以存储 "items" 而不是值,然后在需要时访问元组。
考虑到长度和主要变化,添加另一个答案。鉴于我正在处理一些代码,我用它回答了另一个问题,并且稍作修改也可以用于投资组合。
代码:
from __future__ import (absolute_import, division, print_function,)
# unicode_literals)
import collections
import sys
if sys.version_info.major == 2:
import Queue as queue
import itertools
map = itertools.imap
else: # >= 3
import queue
import ib.opt
import ib.ext.Contract
class IbManager(object):
def __init__(self, timeout=20, **kwargs):
self.q = queue.Queue()
self.timeout = 20
self.con = ib.opt.ibConnection(**kwargs)
self.con.registerAll(self.watcher)
self.msgs = {
ib.opt.message.error: self.errors,
ib.opt.message.updatePortfolio: self.acct_update,
ib.opt.message.accountDownloadEnd: self.acct_update,
}
# Skip the registered ones plus noisy ones from acctUpdate
self.skipmsgs = tuple(self.msgs.keys()) + (
ib.opt.message.updateAccountValue,
ib.opt.message.updateAccountTime)
for msgtype, handler in self.msgs.items():
self.con.register(handler, msgtype)
self.con.connect()
def watcher(self, msg):
if isinstance(msg, ib.opt.message.error):
if msg.errorCode > 2000: # informative message
print('-' * 10, msg)
elif not isinstance(msg, self.skipmsgs):
print('-' * 10, msg)
def errors(self, msg):
if msg.id is None: # something is very wrong in the connection to tws
self.q.put((True, -1, 'Lost Connection to TWS'))
elif msg.errorCode < 1000:
self.q.put((True, msg.errorCode, msg.errorMsg))
def acct_update(self, msg):
self.q.put((False, -1, msg))
def get_account_update(self):
self.con.reqAccountUpdates(True, 'D999999')
portfolio = list()
while True:
try:
err, mid, msg = self.q.get(block=True, timeout=self.timeout)
except queue.Empty:
err, mid, msg = True, -1, "Timeout receiving information"
break
if isinstance(msg, ib.opt.message.accountDownloadEnd):
break
if isinstance(msg, ib.opt.message.updatePortfolio):
c = msg.contract
ticker = '%s-%s-%s' % (c.m_symbol, c.m_secType, c.m_exchange)
entry = collections.OrderedDict(msg.items())
# Don't do this if contract object needs to be referenced later
entry['contract'] = ticker # replace object with the ticker
portfolio.append(entry)
# return list of contract details, followed by:
# last return code (False means no error / True Error)
# last error code or None if no error
# last error message or None if no error
# last error message
return portfolio, err, mid, msg
ibm = IbManager(clientId=5001)
portfolio, err, errid, errmsg = ibm.get_account_update()
if portfolio:
print(','.join(portfolio[0].keys()))
for p in portfolio:
print(','.join(map(str, p.values())))
sys.exit(0) # Ensure ib thread is terminated
结果
Server Version: 76
TWS Time at connection:20160113 00:15:29 CET
---------- <managedAccounts accountsList=D999999>
---------- <nextValidId orderId=1>
---------- <error id=-1, errorCode=2104, errorMsg=Market data farm connection is OK:usfuture>
---------- <error id=-1, errorCode=2104, errorMsg=Market data farm connection is OK:eufarm>
---------- <error id=-1, errorCode=2104, errorMsg=Market data farm connection is OK:cashfarm>
---------- <error id=-1, errorCode=2104, errorMsg=Market data farm connection is OK:usfarm.us>
---------- <error id=-1, errorCode=2106, errorMsg=HMDS data farm connection is OK:ushmds.us>
---------- <error id=-1, errorCode=2106, errorMsg=HMDS data farm connection is OK:ilhmds>
---------- <error id=-1, errorCode=2106, errorMsg=HMDS data farm connection is OK:cashhmds>
---------- <error id=-1, errorCode=2106, errorMsg=HMDS data farm connection is OK:ethmds>
contract,position,marketPrice,marketValue,averageCost,unrealizedPNL,realizedPNL,accountName
IBM-STK-,10,25.0,250.0,210.0,40.0,0.0,D999999
最后2行可以直接导入(例如)Excel。或者给定它是一个字典列表(打印出来的内容),它可以在脚本中进一步操作。
除了使用ibpy,我还会导入IBWrapper,可以从以下网址下载
Github: https://github.com/anthonyng2/ib
import pandas as pd
import numpy as np
import time
from IBWrapper import IBWrapper, contract
from ib.ext.EClientSocket import EClientSocket
accountName = "Your Account ID"
callback = IBWrapper() # Instantiate IBWrapper. callback
tws = EClientSocket(callback) # Instantiate EClientSocket and return data to callback
host = ""
port = 4002 # It is for default port no. in demo account
clientId = 25
tws.eConnect(host, port, clientId) # connect to TWS
create = contract() # Instantiate contract class
callback.initiate_variables()
tws.reqAccountUpdates(1, accountName)
time.sleep(2)
它们是您更新后的账户价值和投资组合摘要:
accvalue = pd.DataFrame(callback.update_AccountValue, columns = ['key', 'value', 'currency', 'accountName']) #[:199]
portfolio = pd.DataFrame(callback.update_Portfolio, columns=['Contract ID','Currency', 'Expiry','Include Expired','Local Symbol','Multiplier','Primary Exchange','Right',
'Security Type','Strike','Symbol','Trading Class','Position','Market Price','Market Value',
'Average Cost', 'Unrealised PnL', 'Realised PnL', 'Account Name'])
callback.update_AccountTime
print("AccountValue: \n" + str(accvalue))
print("portfolio: \n" + str(portfolio))
这是您更新后的职位摘要:
# Position Summary
tws.reqPositions()
time.sleep(2)
dat = pd.DataFrame(callback.update_Position,
columns=['Account','Contract ID','Currency','Exchange','Expiry',
'Include Expired','Local Symbol','Multiplier','Right',
'Security Type','Strike','Symbol','Trading Class',
'Position','Average Cost'])
dat[dat["Account"] == accountName]
print("Position Summary: \n" + str(dat))
我编写了以下代码以允许我直接从 Interactive Brokers API.
读取我的头寸和资产净值
# Interactive Brokers functions to import data
def read_positions(): #read all accounts positions and return DataFrame with information
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.common import TickerId
import pandas as pd
class ib_class(EWrapper, EClient):
def __init__(self):
EClient.__init__(self, self)
self.all_positions = pd.DataFrame([], columns = ['Account','Symbol', 'Quantity', 'Average Cost'])
def position(self, account, contract, pos, avgCost):
index = str(account)+str(contract.symbol)
self.all_positions.loc[index]=account,contract.symbol,pos,avgCost
def error(self, reqId:TickerId, errorCode:int, errorString:str):
if reqId > -1:
print("Error. Id: " , reqId, " Code: " , errorCode , " Msg: " , errorString)
def positionEnd(self):
super().positionEnd()
self.disconnect()
ib_api = ib_class()
ib_api.connect("127.0.0.1", 7496, 0)
ib_api.reqPositions()
current_positions = ib_api.all_positions
ib_api.run()
return(current_positions)
def read_navs(): #read all accounts NAVs
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.common import TickerId
import pandas as pd
class ib_class(EWrapper, EClient):
def __init__(self):
EClient.__init__(self, self)
self.all_accounts = pd.DataFrame([], columns = ['reqId','Account', 'Tag', 'Value' , 'Currency'])
def accountSummary(self, reqId, account, tag, value, currency):
if tag == 'NetLiquidationByCurrency':
index = str(account)
self.all_accounts.loc[index]=reqId, account, tag, value, currency
def error(self, reqId:TickerId, errorCode:int, errorString:str):
if reqId > -1:
print("Error. Id: " , reqId, " Code: " , errorCode , " Msg: " , errorString)
def accountSummaryEnd(self, reqId:int):
super().accountSummaryEnd(reqId)
self.disconnect()
ib_api = ib_class()
ib_api.connect("127.0.0.1", 7496, 0)
ib_api.reqAccountSummary(9001,"All","$LEDGER")
current_nav = ib_api.all_accounts
ib_api.run()
return(current_nav)
为了测试代码,我将其保存在名为 IB_API 和 运行 的 .py 文件中,如下所示:
import IB_API
print("Testing IB's API as an imported library:")
all_positions = IB_API.read_positions()
all_navs = IB_API.read_navs()
print("Test ended")
我仍在努力避免错误 [WinError 10038] "An operation was attempted on something that is not a socket" - 请参阅我关于该主题的问题
我已成功编写代码,使用以下代码从交易平台的演示版中提取有关我的头寸的信息:
tws_conn = conn.Connection.create(port=7497, clientId=100)
tws_conn.register( acct_update, msg.updateAccountValue,
msg.updateAccountTime, msg.updatePortfolio)
tws_conn.connect()
tws_conn.reqPositions()
tws_conn.reqAccountUpdates(True,'DU15181')
但是,它将信息转储为:
<updatePortfolio contract=<Packages.IbPy.ib.ext.Contract.Contract object at 0x06B0FE30>, position=-10, marketPrice=3.4000001, marketValue=-3400.0, averageCost=334.345, unrealizedPNL=-56.55, realizedPNL=0.0, accountName=DU15181>
我想知道如何将上述信息存储到一个数组中,该数组包含用于合约或股票代码的列、投资组合中的数量和不同列中的购买价格
"columns" 要求有些模糊,如果列表中的每个条目本身就是一个列表(并且子列表中的每个索引位置始终包含相同的字段),则列表已经满足要求
您收到的消息发送到您在tws注册的回调。每个 "dumped" 字段都可以使用 "dot" 符号或通过 "keys"、"values" 和 "items"
等 dict-like 方法访问主要的挑战是合约:IB 允许访问大量交易所和不同的交易资产。 "contract" 对象(以及 IB 后端中的信息)似乎反映了提供对所有内容的 uniform/unified 访问的努力,同时表明他们必须建立在现有基础设施之上。
你提到了 "stock ticker" 所以我猜你可能会对类似这样的东西感到满意: IBM-STK-SMART 使用 "ib" 行业标准符号(第二个字段表示它是股票并且第三个是 IB 将使用 SMART 路由进行订单和价格更新)
让我们来看看列表列表:
def acct_update(self, msg):
# Assume the function is a method in a class with access to a member
# 'portfolio' which is a list
if isinstance(msg, ib.opt.message.updatePortfolio):
c = msg.contract
ticker = '%s-%s-%s' % (contract.m_symbol, c.m_secType, c.m_exchange)
entry = [ticker]
entry.extend(msg.values)
self.portfolio.append(entry)
不幸的是,ibpy
消息中的 "keys" 方法不是 classmethod
,但名称实际上是 __slots__
。在持有 acct_update
方法的 class 中,您可以执行以下操作:
class MyClass(object):
portfields = ['ticker'] + ib.opt.message.updatePortfolio.__slots__
如果不是通过索引访问列表中的字段,您更喜欢 ibpy 已经提供的字段名称,您也可以制作一个字典的字典
def __init__(self, ...)
self.portfolio = collections.OrderedDict()
def acct_update(self, msg):
# Assume the function is a method in a class with access to a member
# 'portfolio' which is a list
if isinstance(msg, ib.opt.message.updatePortfolio):
c = msg.contract
ticker = '%s-%s-%s' % (contract.m_symbol, c.m_secType, c.m_exchange)
self.portfolio[ticker] = collections.OrderedDict(msg.items())
这将允许您按名称获取最新的代码信息并按名称访问字段。
如果您需要保留每个代码的历史记录
def __init__(self, ...)
self.portfolio = collections.defaultdict(list)
def acct_update(self, msg):
# Assume the function is a method in a class with access to a member
# 'portfolio' which is a list
if isinstance(msg, ib.opt.message.updatePortfolio):
c = msg.contract
ticker = '%s-%s-%s' % (contract.m_symbol, c.m_secType, c.m_exchange)
self.portfolio[ticker].extend(msg.values())
您可以存储 "items" 而不是值,然后在需要时访问元组。
考虑到长度和主要变化,添加另一个答案。鉴于我正在处理一些代码,我用它回答了另一个问题,并且稍作修改也可以用于投资组合。
代码:
from __future__ import (absolute_import, division, print_function,)
# unicode_literals)
import collections
import sys
if sys.version_info.major == 2:
import Queue as queue
import itertools
map = itertools.imap
else: # >= 3
import queue
import ib.opt
import ib.ext.Contract
class IbManager(object):
def __init__(self, timeout=20, **kwargs):
self.q = queue.Queue()
self.timeout = 20
self.con = ib.opt.ibConnection(**kwargs)
self.con.registerAll(self.watcher)
self.msgs = {
ib.opt.message.error: self.errors,
ib.opt.message.updatePortfolio: self.acct_update,
ib.opt.message.accountDownloadEnd: self.acct_update,
}
# Skip the registered ones plus noisy ones from acctUpdate
self.skipmsgs = tuple(self.msgs.keys()) + (
ib.opt.message.updateAccountValue,
ib.opt.message.updateAccountTime)
for msgtype, handler in self.msgs.items():
self.con.register(handler, msgtype)
self.con.connect()
def watcher(self, msg):
if isinstance(msg, ib.opt.message.error):
if msg.errorCode > 2000: # informative message
print('-' * 10, msg)
elif not isinstance(msg, self.skipmsgs):
print('-' * 10, msg)
def errors(self, msg):
if msg.id is None: # something is very wrong in the connection to tws
self.q.put((True, -1, 'Lost Connection to TWS'))
elif msg.errorCode < 1000:
self.q.put((True, msg.errorCode, msg.errorMsg))
def acct_update(self, msg):
self.q.put((False, -1, msg))
def get_account_update(self):
self.con.reqAccountUpdates(True, 'D999999')
portfolio = list()
while True:
try:
err, mid, msg = self.q.get(block=True, timeout=self.timeout)
except queue.Empty:
err, mid, msg = True, -1, "Timeout receiving information"
break
if isinstance(msg, ib.opt.message.accountDownloadEnd):
break
if isinstance(msg, ib.opt.message.updatePortfolio):
c = msg.contract
ticker = '%s-%s-%s' % (c.m_symbol, c.m_secType, c.m_exchange)
entry = collections.OrderedDict(msg.items())
# Don't do this if contract object needs to be referenced later
entry['contract'] = ticker # replace object with the ticker
portfolio.append(entry)
# return list of contract details, followed by:
# last return code (False means no error / True Error)
# last error code or None if no error
# last error message or None if no error
# last error message
return portfolio, err, mid, msg
ibm = IbManager(clientId=5001)
portfolio, err, errid, errmsg = ibm.get_account_update()
if portfolio:
print(','.join(portfolio[0].keys()))
for p in portfolio:
print(','.join(map(str, p.values())))
sys.exit(0) # Ensure ib thread is terminated
结果
Server Version: 76
TWS Time at connection:20160113 00:15:29 CET
---------- <managedAccounts accountsList=D999999>
---------- <nextValidId orderId=1>
---------- <error id=-1, errorCode=2104, errorMsg=Market data farm connection is OK:usfuture>
---------- <error id=-1, errorCode=2104, errorMsg=Market data farm connection is OK:eufarm>
---------- <error id=-1, errorCode=2104, errorMsg=Market data farm connection is OK:cashfarm>
---------- <error id=-1, errorCode=2104, errorMsg=Market data farm connection is OK:usfarm.us>
---------- <error id=-1, errorCode=2106, errorMsg=HMDS data farm connection is OK:ushmds.us>
---------- <error id=-1, errorCode=2106, errorMsg=HMDS data farm connection is OK:ilhmds>
---------- <error id=-1, errorCode=2106, errorMsg=HMDS data farm connection is OK:cashhmds>
---------- <error id=-1, errorCode=2106, errorMsg=HMDS data farm connection is OK:ethmds>
contract,position,marketPrice,marketValue,averageCost,unrealizedPNL,realizedPNL,accountName
IBM-STK-,10,25.0,250.0,210.0,40.0,0.0,D999999
最后2行可以直接导入(例如)Excel。或者给定它是一个字典列表(打印出来的内容),它可以在脚本中进一步操作。
除了使用ibpy,我还会导入IBWrapper,可以从以下网址下载 Github: https://github.com/anthonyng2/ib
import pandas as pd
import numpy as np
import time
from IBWrapper import IBWrapper, contract
from ib.ext.EClientSocket import EClientSocket
accountName = "Your Account ID"
callback = IBWrapper() # Instantiate IBWrapper. callback
tws = EClientSocket(callback) # Instantiate EClientSocket and return data to callback
host = ""
port = 4002 # It is for default port no. in demo account
clientId = 25
tws.eConnect(host, port, clientId) # connect to TWS
create = contract() # Instantiate contract class
callback.initiate_variables()
tws.reqAccountUpdates(1, accountName)
time.sleep(2)
它们是您更新后的账户价值和投资组合摘要:
accvalue = pd.DataFrame(callback.update_AccountValue, columns = ['key', 'value', 'currency', 'accountName']) #[:199]
portfolio = pd.DataFrame(callback.update_Portfolio, columns=['Contract ID','Currency', 'Expiry','Include Expired','Local Symbol','Multiplier','Primary Exchange','Right',
'Security Type','Strike','Symbol','Trading Class','Position','Market Price','Market Value',
'Average Cost', 'Unrealised PnL', 'Realised PnL', 'Account Name'])
callback.update_AccountTime
print("AccountValue: \n" + str(accvalue))
print("portfolio: \n" + str(portfolio))
这是您更新后的职位摘要:
# Position Summary
tws.reqPositions()
time.sleep(2)
dat = pd.DataFrame(callback.update_Position,
columns=['Account','Contract ID','Currency','Exchange','Expiry',
'Include Expired','Local Symbol','Multiplier','Right',
'Security Type','Strike','Symbol','Trading Class',
'Position','Average Cost'])
dat[dat["Account"] == accountName]
print("Position Summary: \n" + str(dat))
我编写了以下代码以允许我直接从 Interactive Brokers API.
读取我的头寸和资产净值# Interactive Brokers functions to import data
def read_positions(): #read all accounts positions and return DataFrame with information
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.common import TickerId
import pandas as pd
class ib_class(EWrapper, EClient):
def __init__(self):
EClient.__init__(self, self)
self.all_positions = pd.DataFrame([], columns = ['Account','Symbol', 'Quantity', 'Average Cost'])
def position(self, account, contract, pos, avgCost):
index = str(account)+str(contract.symbol)
self.all_positions.loc[index]=account,contract.symbol,pos,avgCost
def error(self, reqId:TickerId, errorCode:int, errorString:str):
if reqId > -1:
print("Error. Id: " , reqId, " Code: " , errorCode , " Msg: " , errorString)
def positionEnd(self):
super().positionEnd()
self.disconnect()
ib_api = ib_class()
ib_api.connect("127.0.0.1", 7496, 0)
ib_api.reqPositions()
current_positions = ib_api.all_positions
ib_api.run()
return(current_positions)
def read_navs(): #read all accounts NAVs
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.common import TickerId
import pandas as pd
class ib_class(EWrapper, EClient):
def __init__(self):
EClient.__init__(self, self)
self.all_accounts = pd.DataFrame([], columns = ['reqId','Account', 'Tag', 'Value' , 'Currency'])
def accountSummary(self, reqId, account, tag, value, currency):
if tag == 'NetLiquidationByCurrency':
index = str(account)
self.all_accounts.loc[index]=reqId, account, tag, value, currency
def error(self, reqId:TickerId, errorCode:int, errorString:str):
if reqId > -1:
print("Error. Id: " , reqId, " Code: " , errorCode , " Msg: " , errorString)
def accountSummaryEnd(self, reqId:int):
super().accountSummaryEnd(reqId)
self.disconnect()
ib_api = ib_class()
ib_api.connect("127.0.0.1", 7496, 0)
ib_api.reqAccountSummary(9001,"All","$LEDGER")
current_nav = ib_api.all_accounts
ib_api.run()
return(current_nav)
为了测试代码,我将其保存在名为 IB_API 和 运行 的 .py 文件中,如下所示:
import IB_API
print("Testing IB's API as an imported library:")
all_positions = IB_API.read_positions()
all_navs = IB_API.read_navs()
print("Test ended")
我仍在努力避免错误 [WinError 10038] "An operation was attempted on something that is not a socket" - 请参阅我关于该主题的问题