DeepAR 预测中的 Sagemaker 端点 BrokenPipeError

Sagemaker Endpoint BrokenPipeError at DeepAR Prediction

我使用以下代码从经过训练的 DeepAR 模型创建了一个 SageMaker 端点:

job_name = estimator.latest_training_job.job_name

endpoint_name = sagemaker_session.endpoint_from_job(
    job_name=job_name,
    initial_instance_count=1,
    instance_type="ml.m4.xlarge",
    image_uri=image_uri,
    role=role
)

现在我想使用 test.json-数据集 (66.2MB) 测试我的模型。 我根据各种 tutorials/sample-notebooks 创建了该文件(与 train.json 相同,但具有 prediction-length-less 值。

为此,我编写了以下代码:

class DeepARPredictor(sagemaker.predictor.Predictor):
    def set_prediction_parameters(self, freq, prediction_length):
        self.freq = freq
        self.prediction_length = prediction_length

    def predict(self, ts, num_samples=100, quantiles=["0.1", "0.5", "0.9"]):
        prediction_times = [x.index[-1] + pd.Timedelta(1, unit=self.freq) for x in ts]
        req = self.__encode_request(ts, num_samples, quantiles)
        res = super(DeepARPredictor, self).predict(req, initial_args={"ContentType": "application/json"})
        return self.__decode_response(res, prediction_times)

    def __encode_request(self, ts, num_samples, quantiles):
        instances = [{"start": str(ts[k].index[0]), "target": list(ts[k])} for k in range(len(ts))]
        configuration = {
            "num_samples": num_samples,
            "output_types": ["quantiles"],
            "quantiles": quantiles,
        }
        http_request_data = {"instances": instances, "configuration": configuration}
        return json.dumps(http_request_data).encode( "utf-8")

    def __decode_response(self, response, prediction_times):
        response_data = json.loads(response.decode("utf-8"))
        list_of_df = []
        for k in range(len(prediction_times)):
            prediction_index = pd.date_range(
                start=prediction_times[k], freq=self.freq, periods=self.prediction_length
            )
            list_of_df.append(
                pd.DataFrame(data=response_data["predictions"][k]["quantiles"], index=prediction_index)
            )
        return list_of_df

但在 运行 之后是以下块:

predictor = DeepARPredictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker_session)
predictor.set_prediction_parameters(freq, prediction_length)
list_of_df = predictor.predict(time_series_training)

我收到 BrokenPipeError:

---------------------------------------------------------------------------
BrokenPipeError                           Traceback (most recent call last)
~/anaconda3/envs/python3/lib/python3.6/site-packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
    676                 headers=headers,
--> 677                 chunked=chunked,
    678             )

~/anaconda3/envs/python3/lib/python3.6/site-packages/urllib3/connectionpool.py in _make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
    391         else:
--> 392             conn.request(method, url, **httplib_request_kw)
    393 

~/anaconda3/envs/python3/lib/python3.6/http/client.py in request(self, method, url, body, headers, encode_chunked)
   1261         """Send a complete request to the server."""
-> 1262         self._send_request(method, url, body, headers, encode_chunked)
   1263 

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/awsrequest.py in _send_request(self, method, url, body, headers, *args, **kwargs)
     92         rval = super(AWSConnection, self)._send_request(
---> 93             method, url, body, headers, *args, **kwargs)
     94         self._expect_header_set = False

~/anaconda3/envs/python3/lib/python3.6/http/client.py in _send_request(self, method, url, body, headers, encode_chunked)
   1307             body = _encode(body, 'body')
-> 1308         self.endheaders(body, encode_chunked=encode_chunked)
   1309 

~/anaconda3/envs/python3/lib/python3.6/http/client.py in endheaders(self, message_body, encode_chunked)
   1256             raise CannotSendHeader()
-> 1257         self._send_output(message_body, encode_chunked=encode_chunked)
   1258 

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/awsrequest.py in _send_output(self, message_body, *args, **kwargs)
    119             message_body = None
--> 120         self.send(msg)
    121         if self._expect_header_set:

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/awsrequest.py in send(self, str)
    203             return
--> 204         return super(AWSConnection, self).send(str)
    205 

~/anaconda3/envs/python3/lib/python3.6/http/client.py in send(self, data)
    995         try:
--> 996             self.sock.sendall(data)
    997         except TypeError:

~/anaconda3/envs/python3/lib/python3.6/ssl.py in sendall(self, data, flags)
    974                 while count < amount:
--> 975                     v = self.send(byte_view[count:])
    976                     count += v

~/anaconda3/envs/python3/lib/python3.6/ssl.py in send(self, data, flags)
    943                     self.__class__)
--> 944             return self._sslobj.write(data)
    945         else:

~/anaconda3/envs/python3/lib/python3.6/ssl.py in write(self, data)
    641         """
--> 642         return self._sslobj.write(data)
    643 

BrokenPipeError: [Errno 32] Broken pipe

During handling of the above exception, another exception occurred:

ProtocolError                             Traceback (most recent call last)
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/httpsession.py in send(self, request)
    319                 decode_content=False,
--> 320                 chunked=self._chunked(request.headers),
    321             )

~/anaconda3/envs/python3/lib/python3.6/site-packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
    726             retries = retries.increment(
--> 727                 method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
    728             )

~/anaconda3/envs/python3/lib/python3.6/site-packages/urllib3/util/retry.py in increment(self, method, url, response, error, _pool, _stacktrace)
    378             # Disabled, indicate to re-raise the error.
--> 379             raise six.reraise(type(error), error, _stacktrace)
    380 

~/anaconda3/envs/python3/lib/python3.6/site-packages/urllib3/packages/six.py in reraise(tp, value, tb)
    733             if value.__traceback__ is not tb:
--> 734                 raise value.with_traceback(tb)
    735             raise value

~/anaconda3/envs/python3/lib/python3.6/site-packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
    676                 headers=headers,
--> 677                 chunked=chunked,
    678             )

~/anaconda3/envs/python3/lib/python3.6/site-packages/urllib3/connectionpool.py in _make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
    391         else:
--> 392             conn.request(method, url, **httplib_request_kw)
    393 

~/anaconda3/envs/python3/lib/python3.6/http/client.py in request(self, method, url, body, headers, encode_chunked)
   1261         """Send a complete request to the server."""
-> 1262         self._send_request(method, url, body, headers, encode_chunked)
   1263 

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/awsrequest.py in _send_request(self, method, url, body, headers, *args, **kwargs)
     92         rval = super(AWSConnection, self)._send_request(
---> 93             method, url, body, headers, *args, **kwargs)
     94         self._expect_header_set = False

~/anaconda3/envs/python3/lib/python3.6/http/client.py in _send_request(self, method, url, body, headers, encode_chunked)
   1307             body = _encode(body, 'body')
-> 1308         self.endheaders(body, encode_chunked=encode_chunked)
   1309 

~/anaconda3/envs/python3/lib/python3.6/http/client.py in endheaders(self, message_body, encode_chunked)
   1256             raise CannotSendHeader()
-> 1257         self._send_output(message_body, encode_chunked=encode_chunked)
   1258 

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/awsrequest.py in _send_output(self, message_body, *args, **kwargs)
    119             message_body = None
--> 120         self.send(msg)
    121         if self._expect_header_set:

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/awsrequest.py in send(self, str)
    203             return
--> 204         return super(AWSConnection, self).send(str)
    205 

~/anaconda3/envs/python3/lib/python3.6/http/client.py in send(self, data)
    995         try:
--> 996             self.sock.sendall(data)
    997         except TypeError:

~/anaconda3/envs/python3/lib/python3.6/ssl.py in sendall(self, data, flags)
    974                 while count < amount:
--> 975                     v = self.send(byte_view[count:])
    976                     count += v

~/anaconda3/envs/python3/lib/python3.6/ssl.py in send(self, data, flags)
    943                     self.__class__)
--> 944             return self._sslobj.write(data)
    945         else:

~/anaconda3/envs/python3/lib/python3.6/ssl.py in write(self, data)
    641         """
--> 642         return self._sslobj.write(data)
    643 

ProtocolError: ('Connection aborted.', BrokenPipeError(32, 'Broken pipe'))

During handling of the above exception, another exception occurred:

ConnectionClosedError                     Traceback (most recent call last)
<ipython-input-14-95dda20e8a70> in <module>
      1 predictor = DeepARPredictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker_session)
      2 predictor.set_prediction_parameters(freq, prediction_length)
----> 3 list_of_df = predictor.predict(time_series_training)

<ipython-input-13-a0fbac2b9b07> in predict(self, ts, num_samples, quantiles)
      7         prediction_times = [x.index[-1] + pd.Timedelta(1, unit=self.freq) for x in ts]
      8         req = self.__encode_request(ts, num_samples, quantiles)
----> 9         res = super(DeepARPredictor, self).predict(req, initial_args={"ContentType": "application/json"})
     10         return self.__decode_response(res, prediction_times)
     11 

~/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker/predictor.py in predict(self, data, initial_args, target_model, target_variant)
    123 
    124         request_args = self._create_request_args(data, initial_args, target_model, target_variant)
--> 125         response = self.sagemaker_session.sagemaker_runtime_client.invoke_endpoint(**request_args)
    126         return self._handle_response(response)
    127 

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/client.py in _api_call(self, *args, **kwargs)
    355                     "%s() only accepts keyword arguments." % py_operation_name)
    356             # The "self" in this scope is referring to the BaseClient.
--> 357             return self._make_api_call(operation_name, kwargs)
    358 
    359         _api_call.__name__ = str(py_operation_name)

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/client.py in _make_api_call(self, operation_name, api_params)
    661         else:
    662             http, parsed_response = self._make_request(
--> 663                 operation_model, request_dict, request_context)
    664 
    665         self.meta.events.emit(

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/client.py in _make_request(self, operation_model, request_dict, request_context)
    680     def _make_request(self, operation_model, request_dict, request_context):
    681         try:
--> 682             return self._endpoint.make_request(operation_model, request_dict)
    683         except Exception as e:
    684             self.meta.events.emit(

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/endpoint.py in make_request(self, operation_model, request_dict)
    100         logger.debug("Making request for %s with params: %s",
    101                      operation_model, request_dict)
--> 102         return self._send_request(request_dict, operation_model)
    103 
    104     def create_request(self, params, operation_model=None):

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/endpoint.py in _send_request(self, request_dict, operation_model)
    135             request, operation_model, context)
    136         while self._needs_retry(attempts, operation_model, request_dict,
--> 137                                 success_response, exception):
    138             attempts += 1
    139             # If there is a stream associated with the request, we need

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/endpoint.py in _needs_retry(self, attempts, operation_model, request_dict, response, caught_exception)
    254             event_name, response=response, endpoint=self,
    255             operation=operation_model, attempts=attempts,
--> 256             caught_exception=caught_exception, request_dict=request_dict)
    257         handler_response = first_non_none_response(responses)
    258         if handler_response is None:

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/hooks.py in emit(self, event_name, **kwargs)
    354     def emit(self, event_name, **kwargs):
    355         aliased_event_name = self._alias_event_name(event_name)
--> 356         return self._emitter.emit(aliased_event_name, **kwargs)
    357 
    358     def emit_until_response(self, event_name, **kwargs):

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/hooks.py in emit(self, event_name, **kwargs)
    226                  handlers.
    227         """
--> 228         return self._emit(event_name, kwargs)
    229 
    230     def emit_until_response(self, event_name, **kwargs):

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/hooks.py in _emit(self, event_name, kwargs, stop_on_response)
    209         for handler in handlers_to_call:
    210             logger.debug('Event %s: calling handler %s', event_name, handler)
--> 211             response = handler(**kwargs)
    212             responses.append((handler, response))
    213             if stop_on_response and response is not None:

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/retryhandler.py in __call__(self, attempts, response, caught_exception, **kwargs)
    181 
    182         """
--> 183         if self._checker(attempts, response, caught_exception):
    184             result = self._action(attempts=attempts)
    185             logger.debug("Retry needed, action of: %s", result)

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/retryhandler.py in __call__(self, attempt_number, response, caught_exception)
    249     def __call__(self, attempt_number, response, caught_exception):
    250         should_retry = self._should_retry(attempt_number, response,
--> 251                                           caught_exception)
    252         if should_retry:
    253             if attempt_number >= self._max_attempts:

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/retryhandler.py in _should_retry(self, attempt_number, response, caught_exception)
    275             # If we've exceeded the max attempts we just let the exception
    276             # propogate if one has occurred.
--> 277             return self._checker(attempt_number, response, caught_exception)
    278 
    279 

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/retryhandler.py in __call__(self, attempt_number, response, caught_exception)
    315         for checker in self._checkers:
    316             checker_response = checker(attempt_number, response,
--> 317                                        caught_exception)
    318             if checker_response:
    319                 return checker_response

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/retryhandler.py in __call__(self, attempt_number, response, caught_exception)
    221         elif caught_exception is not None:
    222             return self._check_caught_exception(
--> 223                 attempt_number, caught_exception)
    224         else:
    225             raise ValueError("Both response and caught_exception are None.")

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/retryhandler.py in _check_caught_exception(self, attempt_number, caught_exception)
    357         # the MaxAttemptsDecorator is not interested in retrying the exception
    358         # then this exception just propogates out past the retry code.
--> 359         raise caught_exception

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/endpoint.py in _do_get_response(self, request, operation_model)
    198             http_response = first_non_none_response(responses)
    199             if http_response is None:
--> 200                 http_response = self._send(request)
    201         except HTTPClientError as e:
    202             return (None, e)

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/endpoint.py in _send(self, request)
    267 
    268     def _send(self, request):
--> 269         return self.http_session.send(request)
    270 
    271 

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/httpsession.py in send(self, request)
    349                 error=e,
    350                 request=request,
--> 351                 endpoint_url=request.url
    352             )
    353         except Exception as e:

ConnectionClosedError: Connection was closed before we received a valid response from endpoint URL

有人知道为什么会这样吗?

我相信Tarun可能走在正确的道路上。当连接突然关闭时,您得到的 BrokenPipeError 将被抛出。参见 the python docs for BrokenPipeError。 一旦您超过 5MB 的限制,SageMaker 端点可能会断开连接。我建议您尝试使用较小的数据集。此外,由于 sagemaker.tensorflow.model.TensorFlowPredictor 在类似问题上如何根据 this comment 对数据进行编码,因此您发送的数据可能会变大。

如果这不起作用,我还看到一些人的网络普遍存在问题。特别是 firewall/antivirus (for example this comment) 或网络超时。

希望这能为您指明正确的方向。