AWS MWAA - 尝试对 Livy 进行休息调用时连接超时
AWS MWAA - connection timed out when try do rest call to Livy
我使用 public 网络选项(版本 2.0.2)创建了 MWAA。
创建了一个气流 dag 样本,其中以下一个属性启动 emr:
JOB_FLOW_OVERRIDES = {
'Name': 'demo-cluster-airflow',
'ReleaseLabel': 'emr-6.2.0',
'LogUri': 's3://path-to-log/',
'Applications': [
{
'Name': 'spark'
},
{
'Name': 'livy'
}
],
'Instances': {
'InstanceGroups': [
{
'Name': 'Master nodes',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1,
}
],
'KeepJobFlowAliveWhenNoSteps': True,
'TerminationProtected': False,
'Ec2KeyName': 'key',
},
'VisibleToAllUsers': True,
'JobFlowRole': 'EMR_EC2_DefaultRole',
'ServiceRole': 'EMR_DefaultRole',
'Tags': [
{
'Key': 'Environment',
'Value': 'Development'
}
]
}
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_cluster',
job_flow_overrides=JOB_FLOW_OVERRIDES
)
启动正常,
然后,使用下一个代码
在 PythonOperator 中调用 livy
def create_spark_session(master_dns, kind='spark'):
# 8998 is the port on which the Livy server runs
host = 'http://' + master_dns + ':8998'
data = {'kind': kind}
headers = {'Content-Type': 'application/json'}
logging.info(f"querying {host}/sessions")
response = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
logging.info(response.json())
return response.headers
但是,任务失败并出现错误:
[2021-06-22 18:43:19,272] {{EmrDag.py:92}} INFO - querying http://ip-10-192-21-17.us-east-2.compute.internal:8998/sessions
[2021-06-22 18:45:29,709] {{taskinstance.py:1482}} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/urllib3/connection.py", line 160, in _new_conn
(self._dns_host, self.port), self.timeout, **extra_kw
File "/usr/local/lib/python3.7/site-packages/urllib3/util/connection.py", line 84, in create_connection
raise err
File "/usr/local/lib/python3.7/site-packages/urllib3/util/connection.py", line 74, in create_connection
sock.connect(sa)
TimeoutError: [Errno 110] Connection timed out
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 677, in urlopen
chunked=chunked,
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 392, in _make_request
conn.request(method, url, **httplib_request_kw)
File "/usr/lib64/python3.7/http/client.py", line 1277, in request
self._send_request(method, url, body, headers, encode_chunked)
File "/usr/lib64/python3.7/http/client.py", line 1323, in _send_request
self.endheaders(body, encode_chunked=encode_chunked)
File "/usr/lib64/python3.7/http/client.py", line 1272, in endheaders
self._send_output(message_body, encode_chunked=encode_chunked)
File "/usr/lib64/python3.7/http/client.py", line 1032, in _send_output
self.send(msg)
File "/usr/lib64/python3.7/http/client.py", line 972, in send
self.connect()
File "/usr/local/lib/python3.7/site-packages/urllib3/connection.py", line 187, in connect
conn = self._new_conn()
File "/usr/local/lib/python3.7/site-packages/urllib3/connection.py", line 172, in _new_conn
self, "Failed to establish a new connection: %s" % e
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7f19a79a6350>: Failed to establish a new connection: [Errno 110] Connection timed out
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 449, in send
timeout=timeout
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 727, in urlopen
method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
File "/usr/local/lib/python3.7/site-packages/urllib3/util/retry.py", line 446, in increment
raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='ip-10-192-21-17.us-east-2.compute.internal', port=8998): Max retries exceeded with url: /sessions (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f19a79a6350>: Failed to establish a new connection: [Errno 110] Connection timed out'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 117, in execute
return_value = self.execute_callable()
File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 128, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/usr/local/airflow/dags/EmrDag.py", line 164, in submit_pi
headers = create_spark_session(cluster_dns, 'spark')
File "/usr/local/airflow/dags/EmrDag.py", line 93, in create_spark_session
response = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
File "/usr/local/lib/python3.7/site-packages/requests/api.py", line 119, in post
return request('post', url, data=data, json=json, **kwargs)
File "/usr/local/lib/python3.7/site-packages/requests/api.py", line 61, in request
return session.request(method=method, url=url, **kwargs)
File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 542, in request
resp = self.send(prep, **send_kwargs)
File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 655, in send
r = adapter.send(request, **kwargs)
File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 516, in send
raise ConnectionError(e, request=request)
requests.exceptions.ConnectionError: HTTPConnectionPool(host='ip-10-192-21-17.us-east-2.compute.internal', port=8998): Max retries exceeded with url: /sessions (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f19a79a6350>: Failed to establish a new connection: [Errno 110] Connection timed out'))
[2021-06-22 18:45:29,795] {{taskinstance.py:1532}} INFO - Marking task as FAILED. dag_id=EmrDag, task_id=submit_pi, execution_date=20210622T183209, start_date=20210622T184317, end_date=20210622T184529
看起来问题出在从 MWAA 到 Livy 的网络中,但不知道如何解决...
问题已解决设置:
Ec2SubnetId = subnetid created by MWAA
EmrManagedMasterSecurityGroup = security group created by MWAA
EmrManagedSlaveSecurityGroup = security group created by MWAA
在JOB_FLOW_OVERRIDES
我使用 public 网络选项(版本 2.0.2)创建了 MWAA。 创建了一个气流 dag 样本,其中以下一个属性启动 emr:
JOB_FLOW_OVERRIDES = {
'Name': 'demo-cluster-airflow',
'ReleaseLabel': 'emr-6.2.0',
'LogUri': 's3://path-to-log/',
'Applications': [
{
'Name': 'spark'
},
{
'Name': 'livy'
}
],
'Instances': {
'InstanceGroups': [
{
'Name': 'Master nodes',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1,
}
],
'KeepJobFlowAliveWhenNoSteps': True,
'TerminationProtected': False,
'Ec2KeyName': 'key',
},
'VisibleToAllUsers': True,
'JobFlowRole': 'EMR_EC2_DefaultRole',
'ServiceRole': 'EMR_DefaultRole',
'Tags': [
{
'Key': 'Environment',
'Value': 'Development'
}
]
}
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_cluster',
job_flow_overrides=JOB_FLOW_OVERRIDES
)
启动正常, 然后,使用下一个代码
在 PythonOperator 中调用 livydef create_spark_session(master_dns, kind='spark'):
# 8998 is the port on which the Livy server runs
host = 'http://' + master_dns + ':8998'
data = {'kind': kind}
headers = {'Content-Type': 'application/json'}
logging.info(f"querying {host}/sessions")
response = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
logging.info(response.json())
return response.headers
但是,任务失败并出现错误:
[2021-06-22 18:43:19,272] {{EmrDag.py:92}} INFO - querying http://ip-10-192-21-17.us-east-2.compute.internal:8998/sessions
[2021-06-22 18:45:29,709] {{taskinstance.py:1482}} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/urllib3/connection.py", line 160, in _new_conn
(self._dns_host, self.port), self.timeout, **extra_kw
File "/usr/local/lib/python3.7/site-packages/urllib3/util/connection.py", line 84, in create_connection
raise err
File "/usr/local/lib/python3.7/site-packages/urllib3/util/connection.py", line 74, in create_connection
sock.connect(sa)
TimeoutError: [Errno 110] Connection timed out
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 677, in urlopen
chunked=chunked,
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 392, in _make_request
conn.request(method, url, **httplib_request_kw)
File "/usr/lib64/python3.7/http/client.py", line 1277, in request
self._send_request(method, url, body, headers, encode_chunked)
File "/usr/lib64/python3.7/http/client.py", line 1323, in _send_request
self.endheaders(body, encode_chunked=encode_chunked)
File "/usr/lib64/python3.7/http/client.py", line 1272, in endheaders
self._send_output(message_body, encode_chunked=encode_chunked)
File "/usr/lib64/python3.7/http/client.py", line 1032, in _send_output
self.send(msg)
File "/usr/lib64/python3.7/http/client.py", line 972, in send
self.connect()
File "/usr/local/lib/python3.7/site-packages/urllib3/connection.py", line 187, in connect
conn = self._new_conn()
File "/usr/local/lib/python3.7/site-packages/urllib3/connection.py", line 172, in _new_conn
self, "Failed to establish a new connection: %s" % e
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7f19a79a6350>: Failed to establish a new connection: [Errno 110] Connection timed out
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 449, in send
timeout=timeout
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 727, in urlopen
method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
File "/usr/local/lib/python3.7/site-packages/urllib3/util/retry.py", line 446, in increment
raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='ip-10-192-21-17.us-east-2.compute.internal', port=8998): Max retries exceeded with url: /sessions (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f19a79a6350>: Failed to establish a new connection: [Errno 110] Connection timed out'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 117, in execute
return_value = self.execute_callable()
File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 128, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/usr/local/airflow/dags/EmrDag.py", line 164, in submit_pi
headers = create_spark_session(cluster_dns, 'spark')
File "/usr/local/airflow/dags/EmrDag.py", line 93, in create_spark_session
response = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
File "/usr/local/lib/python3.7/site-packages/requests/api.py", line 119, in post
return request('post', url, data=data, json=json, **kwargs)
File "/usr/local/lib/python3.7/site-packages/requests/api.py", line 61, in request
return session.request(method=method, url=url, **kwargs)
File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 542, in request
resp = self.send(prep, **send_kwargs)
File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 655, in send
r = adapter.send(request, **kwargs)
File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 516, in send
raise ConnectionError(e, request=request)
requests.exceptions.ConnectionError: HTTPConnectionPool(host='ip-10-192-21-17.us-east-2.compute.internal', port=8998): Max retries exceeded with url: /sessions (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f19a79a6350>: Failed to establish a new connection: [Errno 110] Connection timed out'))
[2021-06-22 18:45:29,795] {{taskinstance.py:1532}} INFO - Marking task as FAILED. dag_id=EmrDag, task_id=submit_pi, execution_date=20210622T183209, start_date=20210622T184317, end_date=20210622T184529
看起来问题出在从 MWAA 到 Livy 的网络中,但不知道如何解决...
问题已解决设置:
Ec2SubnetId = subnetid created by MWAA
EmrManagedMasterSecurityGroup = security group created by MWAA
EmrManagedSlaveSecurityGroup = security group created by MWAA
在JOB_FLOW_OVERRIDES