什么可能导致难以重现的 Hyper HTTP 响应截断?

What might cause a difficult-to-reproduce truncation of a Hyper HTTP response?

我遇到了一个错误,我的 Hyper HTTP 响应被截断为特定大小(7829 字节)。使用 cURL 发出相同的请求效果很好。

请求向 JSON 端点查询数据。响应结构然后被洗牌很多,因为使用了一个相对复杂的速率限制过程来同时发出许多这样的请求。但是,如果只发出一个请求,响应仍然会被截断。

在实施速率限制和进行大量重构之前,程序正确地做出了这些响应。

我做了下面的最小示例,但它无法重现问题。在这一点上,我不确定去哪里看。代码库比较复杂,迭代扩展复制示例很困难,尤其是当我不知道可能导致这种情况的原因时。

Hyper 的响应正文有哪些可能被截断的方式?响应正文在下面的 handle 函数中获取。

#![feature(use_nested_groups)]
extern crate futures;
extern crate hyper;
extern crate hyper_tls;
extern crate tokio_core;

use futures::{Future, Stream};
use hyper::{Body, Chunk, Client, Method, Request, Response};
use hyper_tls::HttpsConnector;
use tokio_core::reactor::Core;
use std::env;

fn main() {
    let mut core = Core::new().unwrap();
    let client = Client::configure()
        .connector(HttpsConnector::new(4, &core.handle()).unwrap())
        .build(&core.handle());

    fn handle(response: Response<Body>) -> Box<Future<Item = usize, Error = hyper::Error>> {
        Box::new(
            response
                .body()
                .concat2()
                .map(move |body: Chunk| -> usize { body.len() }),
        )
    }

    let args: Vec<String> = env::args().collect();
    let uri = &args[1];
    let req = Request::new(Method::Get, uri.parse().unwrap());

    let response_body_length = {
        let future = Box::new(client.request(req).map(handle).flatten());
        core.run(future).unwrap()
    };

    println!("response body length: {}", response_body_length);
}

违规代码:

extern crate serde;
extern crate serde_json;
use futures::{future, stream, Future, Stream};
use hyper;
use hyper::{client, Body, Chunk, Client, Headers, Method, Request, Response, header::Accept,
            header::Date as DateHeader, header::RetryAfter};
use hyper_tls::HttpsConnector;
use tokio_core::reactor::Core;
use models::Bucket;
use std::thread;
use std::time::{Duration, UNIX_EPOCH};
use std::str;

header! { (XRateLimitRemaining, "x-ratelimit-remaining") => [String] }

#[derive(Debug)]
struct Uri(pub String);

const MAX_REQ_SIZE: u32 = 500;

fn make_uri(symbol: &str, page_ix: u32) -> Uri {
    Uri(format!(
        "https://www.bitmex.com/api/v1/trade/bucketed?\
         symbol={symbol}&\
         columns={columns}&\
         partial=false&\
         reverse=true&\
         binSize={bin_size}&\
         count={count}&\
         start={start}",
        symbol = symbol,
        columns = "close,timestamp",
        bin_size = "5m",
        count = MAX_REQ_SIZE,
        start = 0 + MAX_REQ_SIZE * page_ix
    ))
}

#[derive(Debug)]
struct RateLimitInfo {
    remaining_reqs: u32,
    retry_after: Option<Duration>,
}

impl RateLimitInfo {
    fn default() -> RateLimitInfo {
        RateLimitInfo {
            remaining_reqs: 1,
            retry_after: None,
        }
    }
    fn from<T>(resp: &Response<T>) -> RateLimitInfo {
        let headers = resp.headers();
        let remaining_reqs = headers
            .get::<XRateLimitRemaining>()
            .unwrap_or_else(|| panic!("x-ratelimit-remaining not on request."))
            .parse()
            .unwrap();
        let retry_after = match headers.get::<RetryAfter>() {
            Some(RetryAfter::Delay(duration)) => Some(*duration),
            _ => None,
        };
        RateLimitInfo {
            remaining_reqs,
            retry_after,
        }
    }
}

fn resp_dated_later<'a>(a: &'a Response<Body>, b: &'a Response<Body>) -> &'a Response<Body> {
    let get_date = |resp: &Response<Body>| {
        let headers: &Headers = resp.headers();
        **headers.get::<DateHeader>().unwrap()
    };
    if get_date(&a) > get_date(&b) {
        a
    } else {
        b
    }
}

#[derive(Debug)]
struct Query {
    uri: Uri,
    response: Option<Response<Body>>,
}

impl Query {
    fn from_uri(uri: Uri) -> Query {
        Query {
            uri: uri,
            response: None,
        }
    }
}

fn query_good(q: &Query) -> bool {
    match &q.response {
        Some(response) => response.status().is_success(),
        _ => false,
    }
}

type HttpsClient = hyper::Client<HttpsConnector<client::HttpConnector>>;

type FutureQuery = Box<Future<Item = Query, Error = hyper::Error>>;

fn to_future(x: Query) -> FutureQuery {
    Box::new(future::ok(x))
}

fn exec_if_needed(client: &HttpsClient, query: Query) -> FutureQuery {
    fn exec(client: &HttpsClient, q: Query) -> FutureQuery {
        println!("exec: {:?}", q);
        let uri = q.uri;
        let req = {
            let mut req = Request::new(Method::Get, uri.0.parse().unwrap());
            req.headers_mut().set(Accept::json());
            req
        };
        Box::new(
            client
                .request(req)
                .inspect(|resp| println!("HTTP {}", resp.status()))
                .map(|resp| Query {
                    uri: uri,
                    response: Some(resp),
                }),
        )
    }
    if query_good(&query) {
        to_future(query)
    } else {
        exec(client, query)
    }
}

type BoxedFuture<T> = Box<Future<Item = T, Error = hyper::Error>>;

fn do_batch(client: &HttpsClient, queries: Vec<Query>) -> BoxedFuture<Vec<Query>> {
    println!("do_batch() {} queries", queries.len());
    let exec_if_needed = |q| exec_if_needed(client, q);
    let futures = queries.into_iter().map(exec_if_needed);
    println!("do_batch() futures {:?}", futures);
    Box::new(
        stream::futures_ordered(futures).collect(), //future::join_all(futures)
    )
}

fn take<T>(right: &mut Vec<T>, suggested_n: usize) -> Vec<T> {
    let n: usize = if right.len() < suggested_n {
        right.len()
    } else {
        suggested_n
    };
    let left = right.drain(0..n);
    return left.collect();
}

type BoxedResponses = Box<Vec<Response<Body>>>;

fn batched_throttle(uris: Vec<Uri>) -> BoxedResponses {
    println!("batched_throttle({} uris)", uris.len());
    let mut core = Core::new().unwrap();
    let client = Client::configure()
        .connector(HttpsConnector::new(4, &core.handle()).unwrap())
        .build(&core.handle());

    let mut rate_limit_info = RateLimitInfo::default();

    let mut queries_right: Vec<Query> = uris.into_iter().map(Query::from_uri).collect();

    loop {
        let mut queries_left: Vec<Query> = Vec::with_capacity(queries_right.len());

        println!("batched_throttle: starting inner loop");
        loop {
            // throttle program during testing
            thread::sleep(Duration::from_millis(800));
            println!("batched_throttle: {:?}", rate_limit_info);
            if let Some(retry_after) = rate_limit_info.retry_after {
                println!("batched_throttle: retrying after {:?}", retry_after);
                thread::sleep(retry_after)
            }
            if queries_right.is_empty() {
                break;
            }
            let mut queries_mid = {
                let ri_count = rate_limit_info.remaining_reqs;
                let iter_req_count = if ri_count == 0 { 1 } else { ri_count };
                println!("batched_throttle: iter_req_count {}", iter_req_count);
                take(&mut queries_right, iter_req_count as usize)
            };
            println!(
                "batched_throttle: \
                 queries_right.len() {}, \
                 queries_left.len() {}, \
                 queries_mid.len() {})",
                queries_right.len(),
                queries_left.len(),
                queries_mid.len()
            );
            if queries_mid.iter().all(query_good) {
                println!("batched_throttle: queries_mid.iter().all(query_good)");
                continue;
            }
            queries_mid = { core.run(do_batch(&client, queries_mid)).unwrap() };
            rate_limit_info = {
                let create_very_old_response =
                    || Response::new().with_header(DateHeader(UNIX_EPOCH.into()));
                let very_old_response = create_very_old_response();
                let last_resp = queries_mid
                    .iter()
                    .map(|q| match &q.response {
                        Some(r) => r,
                        _ => panic!("Impossible"),
                    })
                    .fold(&very_old_response, resp_dated_later);
                RateLimitInfo::from(&last_resp)
            };
            &queries_left.append(&mut queries_mid);
        }

        queries_right = queries_left;

        if queries_right.iter().all(query_good) {
            break;
        }
    }

    println!(
        "batched_throttle: finishing. queries_right.len() {}",
        queries_right.len()
    );

    Box::new(
        queries_right
            .into_iter()
            .map(|q| q.response.unwrap())
            .collect(),
    )
}

fn bucket_count_to_req_count(bucket_count: u32) -> u32 {
    let needed_req_count = (bucket_count as f32 / MAX_REQ_SIZE as f32).ceil() as u32;
    return needed_req_count;
}

type BoxedBuckets = Box<Vec<Bucket>>;

fn response_to_buckets(response: Response<Body>) -> BoxedFuture<Vec<Bucket>> {
    Box::new(response.body().concat2().map(|body: Chunk| -> Vec<Bucket> {
        println!("body.len(): {}", body.len());
        println!("JSON: {}", str::from_utf8(&body).unwrap());
        serde_json::from_slice(&body).unwrap()
    }))
}

pub fn get_n_last(symbol: &str, bucket_count: u32) -> BoxedBuckets {
    let req_count = bucket_count_to_req_count(bucket_count);
    let uris = (0..req_count)
        .map(|page_ix| make_uri(symbol, page_ix))
        .collect();

    let responses = batched_throttle(uris);

    let mut core = Core::new().unwrap();
    let boxed_buckets = {
        let futures = responses.into_iter().map(response_to_buckets);
        let future = stream::futures_ordered(futures).collect();
        let groups_of_buckets = core.run(future).unwrap();
        Box::new(
            groups_of_buckets
                .into_iter()
                .flat_map(|bs| bs.into_iter())
                .rev()
                .collect(),
        )
    };

    return boxed_buckets;
}

您首先创建一个 Core 并开始大量请求并收集 Response "results"。

在你获得所有 Response 之后,你开始一个新的 Core 并尝试开始从那些 Responses 读取数据 - 但服务器可能很久以前就关闭了它们,因为写入超时,你只能得到部分数据。

你不应该让服务器等待;尽快开始阅读 Responses