如何使用 AirFlow 提取使用 Apache Livy 批处理 POST 方法提交的 Spark 作业客户端日志
How to pull Spark jobs client logs submitted using Apache Livy batches POST method using AirFlow
我正在使用 Apache Livy 批处理 POST 方法提交 Spark 作业。
此 HTTP 请求是使用 AirFlow 发送的。提交作业后,我正在使用批次 ID 跟踪状态。
我想在 Air Flow 日志上显示驱动程序(客户端日志)日志,以避免转到多个地方 AirFLow 和 Apache Livy/Resource Manager。
是否可以使用 Apache Livy REST API?
Livy 有一个端点来获取日志 /sessions/{sessionId}/log
& /batches/{batchId}/log
。
文档:
- https://livy.incubator.apache.org/docs/latest/rest-api.html#get-sessionssessionidlog
- https://livy.incubator.apache.org/docs/latest/rest-api.html#get-batchesbatchidlog
您可以创建如下所示的 python 函数来获取日志:
http = HttpHook("GET", http_conn_id=http_conn_id)
def _http_rest_call(self, method, endpoint, data=None, headers=None, extra_options=None):
if not extra_options:
extra_options = {}
self.http.method = method
response = http.run(endpoint, json.dumps(data), headers, extra_options=extra_options)
return response
def _get_batch_session_logs(self, batch_id):
method = "GET"
endpoint = "batches/" + str(batch_id) + "/log"
response = self._http_rest_call(method=method, endpoint=endpoint)
# return response.json()
return response
Livy 通过两种方式公开 REST API:会话和批处理。在您的情况下,由于我们假设您没有使用会话,因此您正在使用批次提交。您可以使用 curl 命令 post 您的批次:
卷曲http://livy-server-IP:8998/batches
提交作业后,您将在 return 中获得批次 ID。然后你可以使用命令卷曲:
curl http://livy-server-IP:8998/batches/{batchId}/log
您可以在以下位置找到文档:
https://livy.incubator.apache.org/docs/latest/rest-api.html
如果您想避免上述步骤,您可以使用 AWS Marketplace 中的现成 AMI(即 LightningFLow),它为 Airflow 提供自定义 Livy 运算符。 Livy 操作员每 30 秒(可配置)提交并跟踪作业的状态,它还在 Airflow UI 日志中的火花作业结束时提供火花日志。
注意:LightningFlow 预先集成了所有必需的库、Livy、自定义运算符和本地 Spark 集群。
Link 对于 AWS Marketplace:
https://aws.amazon.com/marketplace/pp/Lightning-Analytics-Inc-LightningFlow-Integrated-o/B084BSD66V
这将使您能够在一个地方查看整合日志,而不是在 Airflow 和 EMR/Spark 日志(Ambari/Resource 管理器)之间来回切换。
我正在使用 Apache Livy 批处理 POST 方法提交 Spark 作业。
此 HTTP 请求是使用 AirFlow 发送的。提交作业后,我正在使用批次 ID 跟踪状态。
我想在 Air Flow 日志上显示驱动程序(客户端日志)日志,以避免转到多个地方 AirFLow 和 Apache Livy/Resource Manager。
是否可以使用 Apache Livy REST API?
Livy 有一个端点来获取日志 /sessions/{sessionId}/log
& /batches/{batchId}/log
。
文档:
- https://livy.incubator.apache.org/docs/latest/rest-api.html#get-sessionssessionidlog
- https://livy.incubator.apache.org/docs/latest/rest-api.html#get-batchesbatchidlog
您可以创建如下所示的 python 函数来获取日志:
http = HttpHook("GET", http_conn_id=http_conn_id)
def _http_rest_call(self, method, endpoint, data=None, headers=None, extra_options=None):
if not extra_options:
extra_options = {}
self.http.method = method
response = http.run(endpoint, json.dumps(data), headers, extra_options=extra_options)
return response
def _get_batch_session_logs(self, batch_id):
method = "GET"
endpoint = "batches/" + str(batch_id) + "/log"
response = self._http_rest_call(method=method, endpoint=endpoint)
# return response.json()
return response
Livy 通过两种方式公开 REST API:会话和批处理。在您的情况下,由于我们假设您没有使用会话,因此您正在使用批次提交。您可以使用 curl 命令 post 您的批次:
卷曲http://livy-server-IP:8998/batches
提交作业后,您将在 return 中获得批次 ID。然后你可以使用命令卷曲:
curl http://livy-server-IP:8998/batches/{batchId}/log
您可以在以下位置找到文档: https://livy.incubator.apache.org/docs/latest/rest-api.html
如果您想避免上述步骤,您可以使用 AWS Marketplace 中的现成 AMI(即 LightningFLow),它为 Airflow 提供自定义 Livy 运算符。 Livy 操作员每 30 秒(可配置)提交并跟踪作业的状态,它还在 Airflow UI 日志中的火花作业结束时提供火花日志。
注意:LightningFlow 预先集成了所有必需的库、Livy、自定义运算符和本地 Spark 集群。
Link 对于 AWS Marketplace: https://aws.amazon.com/marketplace/pp/Lightning-Analytics-Inc-LightningFlow-Integrated-o/B084BSD66V
这将使您能够在一个地方查看整合日志,而不是在 Airflow 和 EMR/Spark 日志(Ambari/Resource 管理器)之间来回切换。