Spark - select 语句 运行 多次出现问题

Spark - Issue with select statement run multiple times

请问一个关于 Spark 的小问题。

我想要实现的目标非常简单: 我有一个包含 10 个执行程序的 Spark 集群,我想使用它们。 我需要 运行 查询 select 来自数据库的 10 行。

我的期望是这样的:select 10 行,结果是第 1 2 3 4 5 6 7 8 9 10 行。 然后对每一行应用映射操作。像执行者 1 将操作 Op 应用于该行的第一行,执行者 2 将操作 Op 应用于另一行,等等...

请注意,我的操作 OP 具​​有正确的日志记录和正确的 KPI。

所以,我去试试这个:

 public static void main(String[] args) {
        final String query = "SELECT TOP(10) id, last_name, first_name FROM mytable WHERE ...";
        final SparkSession sparkSession = SparkSession.builder().getOrCreate();
        final Properties   dbConnectionProperties   = new Properties();
        dbConnectionProperties.putAll([...]);

        final Dataset<Row> topTenDataSet = sparkSession.read().jdbc(someUrl, query, dbConnectionProperties);
        topTenDataSet.show();
        
        final Dataset<String> topTenDataSetAfterMap = topTenDataSet.repartition(10).map((MapFunction<Row, String>) row -> performOperationWithLogAndKPI(row), Encoders.STRING());

        LOGGER.info("the count is expected to be 10 " + topTenDataSetAfterMap.count() + topTenDataSetAfterMap.showString(100000, 1000000, false));
        sparkSession.stop();
    }

使用这段代码,会有一个奇怪的结果。

topTenDataSet.show();topTenDataSetAfterMap.count() 都显示 10 行,很高兴。

但是我查看了 Op 操作的日志 performOperationWithLogAndKPI 我可以看到 10 多个日志,10 个指标。也就是说,我可以看到executor 1执行了10次操作,也可以看到executor 2执行了10次操作,等等

似乎每个执行者 运行 都有自己的“SELECT TOP(10) from DB”,并在每个数据集上应用映射函数。

请问:我是不是代码有误?

我的理解有误吗?

如何实现预期的,查询一次,让每个执行者对部分结果集应用一个函数?

谢谢

如果您尝试对同一个数据集执行多个操作,请尝试 cache it。这样 select 前 10 个结果查询应该只执行一次:

final Dataset<Row> topTenDataSet = sparkSession.read().jdbc(someUrl, query, dbConnectionProperties);
topTenDataSet.cache();
topTenDataSet.show();
final Dataset<String> topTenDataSetAfterMap = topTenDataSet.repartition(10).map((MapFunction<Row, String>) row -> performOperationWithLogAndKPI(row), Encoders.STRING());

更多信息here