AWS MWAA - 尝试对 Livy 进行休息调用时连接超时

AWS MWAA - connection timed out when try do rest call to Livy

我使用 public 网络选项(版本 2.0.2)创建了 MWAA。 创建了一个气流 dag 样本,其中以下一个属性启动 emr:

    'Name': 'demo-cluster-airflow',
    'ReleaseLabel': 'emr-6.2.0',
    'LogUri': 's3://path-to-log/',
    'Applications': [
            'Name': 'spark'
            'Name': 'livy'
    'Instances': {
        'InstanceGroups': [
                'Name': 'Master nodes',
                'Market': 'ON_DEMAND',
                'InstanceRole': 'MASTER',
                'InstanceType': 'm5.xlarge',
                'InstanceCount': 1,
        'KeepJobFlowAliveWhenNoSteps': True,
        'TerminationProtected': False,
        'Ec2KeyName': 'key',
    'VisibleToAllUsers': True,
    'JobFlowRole': 'EMR_EC2_DefaultRole',
    'ServiceRole': 'EMR_DefaultRole',
    'Tags': [
            'Key': 'Environment',
            'Value': 'Development'

cluster_creator = EmrCreateJobFlowOperator(

启动正常, 然后,使用下一个代码

PythonOperator 中调用 livy
def create_spark_session(master_dns, kind='spark'):
    # 8998 is the port on which the Livy server runs
    host = 'http://' + master_dns + ':8998'
    data = {'kind': kind}
    headers = {'Content-Type': 'application/json'}"querying {host}/sessions")
    response = + '/sessions', data=json.dumps(data), headers=headers)
    return response.headers


[2021-06-22 18:43:19,272] {{}} INFO - querying
[2021-06-22 18:45:29,709] {{}} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/urllib3/", line 160, in _new_conn
    (self._dns_host, self.port), self.timeout, **extra_kw
  File "/usr/local/lib/python3.7/site-packages/urllib3/util/", line 84, in create_connection
    raise err
  File "/usr/local/lib/python3.7/site-packages/urllib3/util/", line 74, in create_connection
TimeoutError: [Errno 110] Connection timed out

During handling of the above exception, another exception occurred:

    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/urllib3/", line 677, in urlopen
      File "/usr/local/lib/python3.7/site-packages/urllib3/", line 392, in _make_request
        conn.request(method, url, **httplib_request_kw)
      File "/usr/lib64/python3.7/http/", line 1277, in request
        self._send_request(method, url, body, headers, encode_chunked)
      File "/usr/lib64/python3.7/http/", line 1323, in _send_request
        self.endheaders(body, encode_chunked=encode_chunked)
      File "/usr/lib64/python3.7/http/", line 1272, in endheaders
        self._send_output(message_body, encode_chunked=encode_chunked)
      File "/usr/lib64/python3.7/http/", line 1032, in _send_output
      File "/usr/lib64/python3.7/http/", line 972, in send
      File "/usr/local/lib/python3.7/site-packages/urllib3/", line 187, in connect
        conn = self._new_conn()
      File "/usr/local/lib/python3.7/site-packages/urllib3/", line 172, in _new_conn
        self, "Failed to establish a new connection: %s" % e
    urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7f19a79a6350>: Failed to establish a new connection: [Errno 110] Connection timed out
    During handling of the above exception, another exception occurred:
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/requests/", line 449, in send
      File "/usr/local/lib/python3.7/site-packages/urllib3/", line 727, in urlopen
        method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
      File "/usr/local/lib/python3.7/site-packages/urllib3/util/", line 446, in increment
        raise MaxRetryError(_pool, url, error or ResponseError(cause))
    urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='', port=8998): Max retries exceeded with url: /sessions (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f19a79a6350>: Failed to establish a new connection: [Errno 110] Connection timed out'))
    During handling of the above exception, another exception occurred:
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/airflow/models/", line 1138, in _run_raw_task
        self._prepare_and_execute_task_with_callbacks(context, task)
      File "/usr/local/lib/python3.7/site-packages/airflow/models/", line 1311, in _prepare_and_execute_task_with_callbacks
        result = self._execute_task(context, task_copy)
      File "/usr/local/lib/python3.7/site-packages/airflow/models/", line 1341, in _execute_task
        result = task_copy.execute(context=context)
      File "/usr/local/lib/python3.7/site-packages/airflow/operators/", line 117, in execute
        return_value = self.execute_callable()
      File "/usr/local/lib/python3.7/site-packages/airflow/operators/", line 128, in execute_callable
        return self.python_callable(*self.op_args, **self.op_kwargs)
      File "/usr/local/airflow/dags/", line 164, in submit_pi
        headers = create_spark_session(cluster_dns, 'spark')
      File "/usr/local/airflow/dags/", line 93, in create_spark_session
        response = + '/sessions', data=json.dumps(data), headers=headers)
      File "/usr/local/lib/python3.7/site-packages/requests/", line 119, in post
        return request('post', url, data=data, json=json, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/requests/", line 61, in request
        return session.request(method=method, url=url, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/requests/", line 542, in request
        resp = self.send(prep, **send_kwargs)
      File "/usr/local/lib/python3.7/site-packages/requests/", line 655, in send
        r = adapter.send(request, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/requests/", line 516, in send
        raise ConnectionError(e, request=request)
    requests.exceptions.ConnectionError: HTTPConnectionPool(host='', port=8998): Max retries exceeded with url: /sessions (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f19a79a6350>: Failed to establish a new connection: [Errno 110] Connection timed out'))
    [2021-06-22 18:45:29,795] {{}} INFO - Marking task as FAILED. dag_id=EmrDag, task_id=submit_pi, execution_date=20210622T183209, start_date=20210622T184317, end_date=20210622T184529

看起来问题出在从 MWAA 到 Livy 的网络中,但不知道如何解决...


Ec2SubnetId = subnetid created by MWAA
EmrManagedMasterSecurityGroup = security group created by MWAA
EmrManagedSlaveSecurityGroup = security group created by MWAA