使用 Spark Listener 获取在 spark 作业中进行的所有 read/write 查询
Get all read/write queries made in spark job using Spark Listener
我想获取当前 spark 作业中已进行的所有 read/write 查询的列表(使用数据集 API)。例如,
Dataset<Row> readDataFrame = spark.read()
.format("jdbc")
.option("url", drivingUrl)
.option("dbtable", "Select * from A where country_code='US'")
.option("driver", driver)
.load();
我希望捕获查询:Select * from A where country_code='US'
。我尝试为此使用侦听器,以便我可以为我 运行 的任何 spark-submit 作业捕获此信息,而无需更改主代码本身。
我试过的
- QueryExecutionListener
@Override
public void onSuccess(String funcName, QueryExecution qe, long durationNs) {
SparkPlan sparkPlan = qe.executedPlan();
//Tried to search the methods/properties inside it, but couldn't find anything
}
我尝试在 SQLMetrics、子火花计划等中查找,但无法获得我正在搜索的信息。
- SparkListenerSQLExecutionStart
@Override
public void onOtherEvent(SparkListenerEvent event) {
if (event instanceof SparkListenerSQLExecutionStart) {
SparkListenerSQLExecutionStart sparkListenerSQLExecutionStart = (SparkListenerSQLExecutionStart) event;
SparkPlanInfo sparkPlanInfo = sparkListenerSQLExecutionStart.sparkPlanInfo();
System.out.println(sparkListenerSQLExecutionStart.description());
System.out.println(sparkListenerSQLExecutionStart.details());
System.out.println(sparkListenerSQLExecutionStart.physicalPlanDescription());
}
在这里,这些详细信息(以及我查看的其他详细信息)也没有我正在寻找的查询信息。
我相信可以捕获此信息,因为我已经看到像 SparkSplineAgent and questions in Whosebug like this 这样的项目有它,但我一直无法弄清楚如何。
谁能帮我解决这个问题?
经过反复试验,我终于找到了一种方法来完成上述操作。在实现QueryExecutionListener的监听器中,我添加了
@Override
public void onSuccess(String funcName, QueryExecution qe, long durationNs) {
LogicalPlan executedPlan = qe.analyzed();
//maintain a queue to keep track of plans to process
Queue<LogicalPlan> queue = new LinkedList<>();
queue.add(executedPlan);
while (!queue.isEmpty()) {
//get the first plan from queue
LogicalPlan curPlan = queue.remove();
if (curPlan instanceof LogicalRelation) {
LogicalRelation logicalRelation = (LogicalRelation) curPlan;
BaseRelation baseRelation = logicalRelation.relation();
if (baseRelation instanceof JDBCRelation) {
JDBCRelation jdbcRelation = (JDBCRelation) baseRelation;
System.out.println(jdbcRelation.jdbcOptions().table());
}
System.out.println(logicalRelation.relation());
}
//add all child plans to the queue
Iterator<LogicalPlan> childItr = curPlan.children().iterator();
while (childItr.hasNext()) {
LogicalPlan logicalPlan = childItr.next();
queue.add(logicalPlan);
}
}
}
这给了我想要的输出
SELECT * from A where country_code='US'
我想获取当前 spark 作业中已进行的所有 read/write 查询的列表(使用数据集 API)。例如,
Dataset<Row> readDataFrame = spark.read()
.format("jdbc")
.option("url", drivingUrl)
.option("dbtable", "Select * from A where country_code='US'")
.option("driver", driver)
.load();
我希望捕获查询:Select * from A where country_code='US'
。我尝试为此使用侦听器,以便我可以为我 运行 的任何 spark-submit 作业捕获此信息,而无需更改主代码本身。
我试过的
- QueryExecutionListener
@Override
public void onSuccess(String funcName, QueryExecution qe, long durationNs) {
SparkPlan sparkPlan = qe.executedPlan();
//Tried to search the methods/properties inside it, but couldn't find anything
}
我尝试在 SQLMetrics、子火花计划等中查找,但无法获得我正在搜索的信息。
- SparkListenerSQLExecutionStart
@Override
public void onOtherEvent(SparkListenerEvent event) {
if (event instanceof SparkListenerSQLExecutionStart) {
SparkListenerSQLExecutionStart sparkListenerSQLExecutionStart = (SparkListenerSQLExecutionStart) event;
SparkPlanInfo sparkPlanInfo = sparkListenerSQLExecutionStart.sparkPlanInfo();
System.out.println(sparkListenerSQLExecutionStart.description());
System.out.println(sparkListenerSQLExecutionStart.details());
System.out.println(sparkListenerSQLExecutionStart.physicalPlanDescription());
}
在这里,这些详细信息(以及我查看的其他详细信息)也没有我正在寻找的查询信息。
我相信可以捕获此信息,因为我已经看到像 SparkSplineAgent and questions in Whosebug like this 这样的项目有它,但我一直无法弄清楚如何。
谁能帮我解决这个问题?
经过反复试验,我终于找到了一种方法来完成上述操作。在实现QueryExecutionListener的监听器中,我添加了
@Override
public void onSuccess(String funcName, QueryExecution qe, long durationNs) {
LogicalPlan executedPlan = qe.analyzed();
//maintain a queue to keep track of plans to process
Queue<LogicalPlan> queue = new LinkedList<>();
queue.add(executedPlan);
while (!queue.isEmpty()) {
//get the first plan from queue
LogicalPlan curPlan = queue.remove();
if (curPlan instanceof LogicalRelation) {
LogicalRelation logicalRelation = (LogicalRelation) curPlan;
BaseRelation baseRelation = logicalRelation.relation();
if (baseRelation instanceof JDBCRelation) {
JDBCRelation jdbcRelation = (JDBCRelation) baseRelation;
System.out.println(jdbcRelation.jdbcOptions().table());
}
System.out.println(logicalRelation.relation());
}
//add all child plans to the queue
Iterator<LogicalPlan> childItr = curPlan.children().iterator();
while (childItr.hasNext()) {
LogicalPlan logicalPlan = childItr.next();
queue.add(logicalPlan);
}
}
}
这给了我想要的输出
SELECT * from A where country_code='US'