每个 rdd 上的 apache spark 运行 任务

apache spark running task on each rdd

我有一个 rdd,它分布在 spark 环境中的多台机器上。我想在这个 rdd 上的每台工作机器上执行一个函数。 我不想收集 rdd 然后在驱动程序上执行函数。该函数应该在每个执行器上为他们自己的 rdd 单独执行。 我该怎么做

更新(添加代码) 我 运行 这一切都在火花 shell

import org.apache.spark.sql.cassandra.CassandraSQLContext
import java.util.Properties

 val cc = new CassandraSQLContext(sc)
 val rdd  = cc.sql("select * from sams.events where appname = 'test'");
 val df = rdd.select("appname", "assetname");

这里我有一个 400 行的 df。我需要将此 df 保存到 sql 服务器 table。当我尝试使用 df.write 方法时,它给了我错误,我已经在一个单独的线程中发布了这些错误 spark dataframe not appending to the table

我可以打开 driverManager 连接并插入行,但这将在 spark 的驱动程序模块中完成

import java.sql._
import com.microsoft.sqlserver.jdbc.SQLServerDriver
// create a Statement from the connection
Statement statement = conn.createStatement();

// insert the data
statement.executeUpdate("INSERT INTO Customers " + "VALUES (1001, 'Simpson', 'Mr.', 'Springfield', 2001)");
String connectionUrl = "jdbc:sqlserver://localhost:1433;" +
   "databaseName=AdventureWorks;user=MyUserName;password=*****;";
Connection con = DriverManager.getConnection(connectionUrl);

我需要在执行机上写这个。我怎样才能做到这一点?

为了设置从 worker 到其他系统的连接,我们应该使用 rdd.foreachPartitions(iter => ...)

foreachPartitions 允许您对每个分区执行一个操作,让您可以像本地迭代器一样访问分区的数据。 如果每个分区有足够的数据,设置资源(如数据库连接)的时间可以通过在整个分区上使用这些资源来分摊。

摘要,例如

rdd.foreachPartition(iter => 
   //setup db connection
   val dbconn = Driver.connect(ip, port)
   iter.foreach{element => 
       val query = makeQuery(element)
       dbconn.execute(query)
   }
   dbconn.close
}

还可以创建单例资源管理器来为集群的每个 JVM 管理这些资源。有关此类本地资源管理器的完整示例,另请参阅此答案: