为比特币交易所构建订单簿表示
Building an Orderbook representation for a Bitcoin exchange
我正在尝试为 Poloniex Bitcoin exchange. I am subscribing to the Push-API 构建一个 Orderbook 表示,它通过 Websocket 发送 Orderbook 的更新。问题是我的订单簿随着时间的推移变得 不一致,即本应删除的订单仍在我的订单簿中。
下图Orderbook格式为:
Exchange-Name - ASK - Amount - Price | Price - Amount - BID - Exchange-Name
左边(ASK)是卖货币的人。右侧 (BID) 是正在购买货币的人。 BTCUSD、ETHBTC 和 ETHUSD 描述了不同的市场。 BTCUSD表示比特币兑换美元,ETHBTC表示以太币兑换比特币,ETHUSD表示以太坊兑换美元。
Poloniex 以 JSON-格式通过 Websocket 发送更新。这是此类更新的示例:
[
36,
7597659581972377,
8089731807973507,
{},
[
{"data":{"rate":"609.00000029","type":"bid"},"type":"orderBookRemove"},{"data":{"amount":"0.09514285","rate":"609.00000031","type":"bid"},"type":"orderBookModify"}
],
{
"seq":19976127
}
]
- json[0]这个问题可以忽略
json[1] 是市场标识符。这意味着我发送了一个请求,如“订阅市场 BTCUSD”,他们回答“BTCUSD 更新将在标识符号 7597659581972377 下发送”。
json[2]这个问题可以忽略
- json[3]这个问题可以忽略
- json[4] 包含实际更新数据。稍后会详细介绍。
- json[5] 包含序列号。它用于在更新无序到达时正确执行更新。因此,如果我在 1 秒内通过 1 - 3 - 5 - 4 - 2 的顺序收到 5 个更新,它们必须像 1 - 2 - 3 - 4 - 5。每个市场都有不同的 "sequence-number-sequence".
正如我所说,json[4] 包含一个 array 更新。 json[4][array-index]["type"]
有3种:
- orderBookModify: 特定价格的可用数量已更改。
- orderBookRemove: 订单不再可用,必须删除。
- newTrade:可用于建立交易历史。我正在尝试执行的操作不需要它,因此可以忽略它。
json[4][array-index]["data"]
如果是 orderBookRemove 则包含两个值,如果是 orderBookModify 则包含三个值。
- rate: 价格.
- 数量(仅当它是订单簿修改时存在):新数量。
- 类型:询问或出价.
还有一种特殊留言:
[36,8932491360003688,1315671639915103,{},[],{"seq":98045310}]
它只包含一个序列号。它是一种心跳消息,不发送任何更新。
代码
我用了三个容器:
std::map<std::uint64_t,CMarket> m_mMarkets;
std::map<CMarket, long> m_mCurrentSeq;
std::map<CMarket, std::map<long, web::json::value>> m_mStack;
m_mMarkets
用于将市场标识符号映射到市场,因为它存储在我的程序中。
m_mCurrentSeq
用于存储每个市场的当前序号。
m_mStack
按市场和序列号存储更新(这就是 long
的用途)直到它们可以被执行。
这是接收更新的部分:
// ....
// This method can be called asynchronously, so lock the containers.
this->m_muMutex.lock();
// Map the market-identifier to a CMarket object.
CMarket market = this->m_mMarkets.at(json[1].as_number().to_uint64());
// Check if it is a known market. This should never happen!
if(this->m_mMarkets.find(json[1].as_number().to_uint64()) == this->m_mMarkets.end())
{
this->m_muMutex.unlock();
throw std::runtime_error("Received Market update of unknown Market");
}
// Add the update to the execution-queue
this->m_mStack[market][(long)json[5]["seq"].as_integer()] = json;
// Execute the execution-queue
this->executeStack();
this->m_muMutex.unlock();
// ....
现在是执行队列。我想这就是我的错误所在。
函数:"executeStack":
for(auto& market : this->m_mMarkets) // For all markets
{
if(this->m_mCurrentSeq.find(market.second) != this->m_mCurrentSeq.end()) // if market has a sequence number
{
long seqNum = this->m_mCurrentSeq.at(market.second);
// erase old entries
for(std::map<long, web::json::value>::iterator it = this->m_mStack.at(market.second).begin(); it != this->m_mStack.at(market.second).end(); )
{
if((*it).first < seqNum)
it = this->m_mStack.at(market.second).erase(it);
else
++it;
}
// This container is used to store the updates to the Orderbook temporarily.
std::vector<Order> addOrderStack{};
while(this->m_mStack.at(market.second).find(seqNum) != this->m_mStack.at(market.second).end())// has entry for seqNum
{
web::json::value json = this->m_mStack.at(market.second).at(seqNum);
for(auto& v : json[4].as_array())
{
if(v["type"].as_string().compare("orderBookModify") == 0)
{
Order::Type t = v["data"]["type"].as_string().compare("ask") == 0 ? Order::Type::Ask : Order::Type::Bid;
Order newOrder(std::stod(v["data"]["rate"].as_string()), std::stod(v["data"]["amount"].as_string()), t, market.second, this->m_pclParent, v.serialize());
addOrderStack.push_back(newOrder);
} else if(v["type"].as_string().compare("orderBookRemove") == 0)
{
Order::Type t = v["data"]["type"].as_string().compare("ask") == 0 ? Order::Type::Ask : Order::Type::Bid;
Order newOrder(std::stod(v["data"]["rate"].as_string()), 0, t, market.second, this->m_pclParent, v.serialize());
addOrderStack.push_back(newOrder);
} else if(v["type"].as_string().compare("newTrade") == 0)
{
//
} else
{
throw std::runtime_error("Unknown message format");
}
}
this->m_mStack.at(market.second).erase(seqNum);
seqNum++;
}
// The actual OrderList gets modified here. The mistake CANNOT be inside OrderList::addOrderStack, because I am running Orderbooks for other exchanges too and they use the same method to modify the Orderbook, and they do not get inconsistent.
if(addOrderStack.size() > 0)
OrderList::addOrderStack(addOrderStack);
this->m_mCurrentSeq.at(market.second) = seqNum;
}
}
因此,如果运行时间较长,Orderbook 就会变得不一致。这意味着应该删除的订单仍然可用,并且账簿中有错误的条目。我不太清楚为什么会这样。也许我对序列号做错了什么,因为更新堆栈似乎并不总是正确执行。我已经尝试了我想到的一切,但我无法让它发挥作用,现在我不知道可能出了什么问题。如果您有任何问题,请随时提出。
tl;dr:Poloniex API 不完善并且会丢失消息。有些根本就没有到达。我发现无论在世界的哪个位置,所有订阅的用户都会出现这种情况。
希望关于使用 Autobahn|cpp 连接到 Poloniex 的 Websocket API () 的回答有用。我怀疑您已经弄清楚了(否则这个 question/problem 对您来说不存在)。您可能已经了解到,我也有一个用 C++ 编写的加密货币机器人。我断断续续地研究了大约 3.5 年。
你面临的问题也是我必须克服的。在这种情况下,我宁愿不提供我的源代码,因为您处理它的速度会对您的利润率产生巨大影响。但是,我将提供 sudo 代码,它提供了一些非常粗略的见解,让我了解我如何处理 Poloniex 的 Web 套接字事件处理。
//Sudo Code
void someClass::handle_poloniex_ws_event(ws_event event){
if(event.seq_num == expected_seq_num){
process_ws_event(event)
update_expected_seq_num
}
else{
if(in_cache(expected_seq_num){
process_ws_event(from_cache(expected_seq_num))
update_expected_seq_num
}
else{
cache_event(event)
}
}
}
请注意,我上面写的是我实际操作的超级简化版本。我的实际解决方案大约有 500 多行,其中包含 "goto xxx" 和 "goto yyy"。我建议采用 timestamps/cpu 时钟周期计数并与当前 time/cycle 计数进行比较,以帮助您在任何给定时刻做出决定(例如,我应该等待丢失的事件,我是否应该继续处理并注意程序的其余部分可能存在不准确之处,我是否应该使用 GET 请求来重新填充我的 table,等等?)。这里的游戏名称是速度,我相信你知道。祝你好运!希望收到你的来信。 :-)
我正在尝试为 Poloniex Bitcoin exchange. I am subscribing to the Push-API 构建一个 Orderbook 表示,它通过 Websocket 发送 Orderbook 的更新。问题是我的订单簿随着时间的推移变得 不一致,即本应删除的订单仍在我的订单簿中。
下图Orderbook格式为:
Exchange-Name - ASK - Amount - Price | Price - Amount - BID - Exchange-Name
左边(ASK)是卖货币的人。右侧 (BID) 是正在购买货币的人。 BTCUSD、ETHBTC 和 ETHUSD 描述了不同的市场。 BTCUSD表示比特币兑换美元,ETHBTC表示以太币兑换比特币,ETHUSD表示以太坊兑换美元。
Poloniex 以 JSON-格式通过 Websocket 发送更新。这是此类更新的示例:
[
36,
7597659581972377,
8089731807973507,
{},
[
{"data":{"rate":"609.00000029","type":"bid"},"type":"orderBookRemove"},{"data":{"amount":"0.09514285","rate":"609.00000031","type":"bid"},"type":"orderBookModify"}
],
{
"seq":19976127
}
]
- json[0]这个问题可以忽略
json[1] 是市场标识符。这意味着我发送了一个请求,如“订阅市场 BTCUSD”,他们回答“BTCUSD 更新将在标识符号 7597659581972377 下发送”。
json[2]这个问题可以忽略
- json[3]这个问题可以忽略
- json[4] 包含实际更新数据。稍后会详细介绍。
- json[5] 包含序列号。它用于在更新无序到达时正确执行更新。因此,如果我在 1 秒内通过 1 - 3 - 5 - 4 - 2 的顺序收到 5 个更新,它们必须像 1 - 2 - 3 - 4 - 5。每个市场都有不同的 "sequence-number-sequence".
正如我所说,json[4] 包含一个 array 更新。 json[4][array-index]["type"]
有3种:
- orderBookModify: 特定价格的可用数量已更改。
- orderBookRemove: 订单不再可用,必须删除。
- newTrade:可用于建立交易历史。我正在尝试执行的操作不需要它,因此可以忽略它。
json[4][array-index]["data"]
如果是 orderBookRemove 则包含两个值,如果是 orderBookModify 则包含三个值。
- rate: 价格.
- 数量(仅当它是订单簿修改时存在):新数量。
- 类型:询问或出价.
还有一种特殊留言:
[36,8932491360003688,1315671639915103,{},[],{"seq":98045310}]
它只包含一个序列号。它是一种心跳消息,不发送任何更新。
代码
我用了三个容器:
std::map<std::uint64_t,CMarket> m_mMarkets;
std::map<CMarket, long> m_mCurrentSeq;
std::map<CMarket, std::map<long, web::json::value>> m_mStack;
m_mMarkets
用于将市场标识符号映射到市场,因为它存储在我的程序中。
m_mCurrentSeq
用于存储每个市场的当前序号。
m_mStack
按市场和序列号存储更新(这就是 long
的用途)直到它们可以被执行。
这是接收更新的部分:
// ....
// This method can be called asynchronously, so lock the containers.
this->m_muMutex.lock();
// Map the market-identifier to a CMarket object.
CMarket market = this->m_mMarkets.at(json[1].as_number().to_uint64());
// Check if it is a known market. This should never happen!
if(this->m_mMarkets.find(json[1].as_number().to_uint64()) == this->m_mMarkets.end())
{
this->m_muMutex.unlock();
throw std::runtime_error("Received Market update of unknown Market");
}
// Add the update to the execution-queue
this->m_mStack[market][(long)json[5]["seq"].as_integer()] = json;
// Execute the execution-queue
this->executeStack();
this->m_muMutex.unlock();
// ....
现在是执行队列。我想这就是我的错误所在。
函数:"executeStack":
for(auto& market : this->m_mMarkets) // For all markets
{
if(this->m_mCurrentSeq.find(market.second) != this->m_mCurrentSeq.end()) // if market has a sequence number
{
long seqNum = this->m_mCurrentSeq.at(market.second);
// erase old entries
for(std::map<long, web::json::value>::iterator it = this->m_mStack.at(market.second).begin(); it != this->m_mStack.at(market.second).end(); )
{
if((*it).first < seqNum)
it = this->m_mStack.at(market.second).erase(it);
else
++it;
}
// This container is used to store the updates to the Orderbook temporarily.
std::vector<Order> addOrderStack{};
while(this->m_mStack.at(market.second).find(seqNum) != this->m_mStack.at(market.second).end())// has entry for seqNum
{
web::json::value json = this->m_mStack.at(market.second).at(seqNum);
for(auto& v : json[4].as_array())
{
if(v["type"].as_string().compare("orderBookModify") == 0)
{
Order::Type t = v["data"]["type"].as_string().compare("ask") == 0 ? Order::Type::Ask : Order::Type::Bid;
Order newOrder(std::stod(v["data"]["rate"].as_string()), std::stod(v["data"]["amount"].as_string()), t, market.second, this->m_pclParent, v.serialize());
addOrderStack.push_back(newOrder);
} else if(v["type"].as_string().compare("orderBookRemove") == 0)
{
Order::Type t = v["data"]["type"].as_string().compare("ask") == 0 ? Order::Type::Ask : Order::Type::Bid;
Order newOrder(std::stod(v["data"]["rate"].as_string()), 0, t, market.second, this->m_pclParent, v.serialize());
addOrderStack.push_back(newOrder);
} else if(v["type"].as_string().compare("newTrade") == 0)
{
//
} else
{
throw std::runtime_error("Unknown message format");
}
}
this->m_mStack.at(market.second).erase(seqNum);
seqNum++;
}
// The actual OrderList gets modified here. The mistake CANNOT be inside OrderList::addOrderStack, because I am running Orderbooks for other exchanges too and they use the same method to modify the Orderbook, and they do not get inconsistent.
if(addOrderStack.size() > 0)
OrderList::addOrderStack(addOrderStack);
this->m_mCurrentSeq.at(market.second) = seqNum;
}
}
因此,如果运行时间较长,Orderbook 就会变得不一致。这意味着应该删除的订单仍然可用,并且账簿中有错误的条目。我不太清楚为什么会这样。也许我对序列号做错了什么,因为更新堆栈似乎并不总是正确执行。我已经尝试了我想到的一切,但我无法让它发挥作用,现在我不知道可能出了什么问题。如果您有任何问题,请随时提出。
tl;dr:Poloniex API 不完善并且会丢失消息。有些根本就没有到达。我发现无论在世界的哪个位置,所有订阅的用户都会出现这种情况。
希望关于使用 Autobahn|cpp 连接到 Poloniex 的 Websocket API (
你面临的问题也是我必须克服的。在这种情况下,我宁愿不提供我的源代码,因为您处理它的速度会对您的利润率产生巨大影响。但是,我将提供 sudo 代码,它提供了一些非常粗略的见解,让我了解我如何处理 Poloniex 的 Web 套接字事件处理。
//Sudo Code
void someClass::handle_poloniex_ws_event(ws_event event){
if(event.seq_num == expected_seq_num){
process_ws_event(event)
update_expected_seq_num
}
else{
if(in_cache(expected_seq_num){
process_ws_event(from_cache(expected_seq_num))
update_expected_seq_num
}
else{
cache_event(event)
}
}
}
请注意,我上面写的是我实际操作的超级简化版本。我的实际解决方案大约有 500 多行,其中包含 "goto xxx" 和 "goto yyy"。我建议采用 timestamps/cpu 时钟周期计数并与当前 time/cycle 计数进行比较,以帮助您在任何给定时刻做出决定(例如,我应该等待丢失的事件,我是否应该继续处理并注意程序的其余部分可能存在不准确之处,我是否应该使用 GET 请求来重新填充我的 table,等等?)。这里的游戏名称是速度,我相信你知道。祝你好运!希望收到你的来信。 :-)