AWS MWAA - 尝试对 Livy 进行休息调用时连接超时

AWS MWAA - connection timed out when try do rest call to Livy

我使用 public 网络选项(版本 2.0.2)创建了 MWAA。 创建了一个气流 dag 样本,其中以下一个属性启动 emr:

JOB_FLOW_OVERRIDES = {
    'Name': 'demo-cluster-airflow',
    'ReleaseLabel': 'emr-6.2.0',
    'LogUri': 's3://path-to-log/',
    'Applications': [
        {
            'Name': 'spark'
        },
        {
            'Name': 'livy'
        }
    ],
    'Instances': {
        'InstanceGroups': [
            {
                'Name': 'Master nodes',
                'Market': 'ON_DEMAND',
                'InstanceRole': 'MASTER',
                'InstanceType': 'm5.xlarge',
                'InstanceCount': 1,
            }
        ],
        'KeepJobFlowAliveWhenNoSteps': True,
        'TerminationProtected': False,
        'Ec2KeyName': 'key',
    },
    'VisibleToAllUsers': True,
    'JobFlowRole': 'EMR_EC2_DefaultRole',
    'ServiceRole': 'EMR_DefaultRole',
    'Tags': [
        {
            'Key': 'Environment',
            'Value': 'Development'
        }
    ]
}

cluster_creator = EmrCreateJobFlowOperator(
        task_id='create_cluster',
        job_flow_overrides=JOB_FLOW_OVERRIDES
    )

启动正常, 然后,使用下一个代码

PythonOperator 中调用 livy
def create_spark_session(master_dns, kind='spark'):
    # 8998 is the port on which the Livy server runs
    host = 'http://' + master_dns + ':8998'
    data = {'kind': kind}
    headers = {'Content-Type': 'application/json'}
    logging.info(f"querying {host}/sessions")
    response = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
    logging.info(response.json())
    return response.headers

但是,任务失败并出现错误:

[2021-06-22 18:43:19,272] {{EmrDag.py:92}} INFO - querying http://ip-10-192-21-17.us-east-2.compute.internal:8998/sessions
[2021-06-22 18:45:29,709] {{taskinstance.py:1482}} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/urllib3/connection.py", line 160, in _new_conn
    (self._dns_host, self.port), self.timeout, **extra_kw
  File "/usr/local/lib/python3.7/site-packages/urllib3/util/connection.py", line 84, in create_connection
    raise err
  File "/usr/local/lib/python3.7/site-packages/urllib3/util/connection.py", line 74, in create_connection
    sock.connect(sa)
TimeoutError: [Errno 110] Connection timed out

During handling of the above exception, another exception occurred:

    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 677, in urlopen
        chunked=chunked,
      File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 392, in _make_request
        conn.request(method, url, **httplib_request_kw)
      File "/usr/lib64/python3.7/http/client.py", line 1277, in request
        self._send_request(method, url, body, headers, encode_chunked)
      File "/usr/lib64/python3.7/http/client.py", line 1323, in _send_request
        self.endheaders(body, encode_chunked=encode_chunked)
      File "/usr/lib64/python3.7/http/client.py", line 1272, in endheaders
        self._send_output(message_body, encode_chunked=encode_chunked)
      File "/usr/lib64/python3.7/http/client.py", line 1032, in _send_output
        self.send(msg)
      File "/usr/lib64/python3.7/http/client.py", line 972, in send
        self.connect()
      File "/usr/local/lib/python3.7/site-packages/urllib3/connection.py", line 187, in connect
        conn = self._new_conn()
      File "/usr/local/lib/python3.7/site-packages/urllib3/connection.py", line 172, in _new_conn
        self, "Failed to establish a new connection: %s" % e
    urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7f19a79a6350>: Failed to establish a new connection: [Errno 110] Connection timed out
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 449, in send
        timeout=timeout
      File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 727, in urlopen
        method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
      File "/usr/local/lib/python3.7/site-packages/urllib3/util/retry.py", line 446, in increment
        raise MaxRetryError(_pool, url, error or ResponseError(cause))
    urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='ip-10-192-21-17.us-east-2.compute.internal', port=8998): Max retries exceeded with url: /sessions (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f19a79a6350>: Failed to establish a new connection: [Errno 110] Connection timed out'))
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
        self._prepare_and_execute_task_with_callbacks(context, task)
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
        result = self._execute_task(context, task_copy)
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
        result = task_copy.execute(context=context)
      File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 117, in execute
        return_value = self.execute_callable()
      File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 128, in execute_callable
        return self.python_callable(*self.op_args, **self.op_kwargs)
      File "/usr/local/airflow/dags/EmrDag.py", line 164, in submit_pi
        headers = create_spark_session(cluster_dns, 'spark')
      File "/usr/local/airflow/dags/EmrDag.py", line 93, in create_spark_session
        response = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
      File "/usr/local/lib/python3.7/site-packages/requests/api.py", line 119, in post
        return request('post', url, data=data, json=json, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/requests/api.py", line 61, in request
        return session.request(method=method, url=url, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 542, in request
        resp = self.send(prep, **send_kwargs)
      File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 655, in send
        r = adapter.send(request, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 516, in send
        raise ConnectionError(e, request=request)
    requests.exceptions.ConnectionError: HTTPConnectionPool(host='ip-10-192-21-17.us-east-2.compute.internal', port=8998): Max retries exceeded with url: /sessions (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f19a79a6350>: Failed to establish a new connection: [Errno 110] Connection timed out'))
    [2021-06-22 18:45:29,795] {{taskinstance.py:1532}} INFO - Marking task as FAILED. dag_id=EmrDag, task_id=submit_pi, execution_date=20210622T183209, start_date=20210622T184317, end_date=20210622T184529

看起来问题出在从 MWAA 到 Livy 的网络中,但不知道如何解决...

问题已解决设置:

Ec2SubnetId = subnetid created by MWAA
EmrManagedMasterSecurityGroup = security group created by MWAA
EmrManagedSlaveSecurityGroup = security group created by MWAA

在JOB_FLOW_OVERRIDES