每个 rdd 上的 apache spark 运行 任务
apache spark running task on each rdd
我有一个 rdd,它分布在 spark 环境中的多台机器上。我想在这个 rdd 上的每台工作机器上执行一个函数。
我不想收集 rdd 然后在驱动程序上执行函数。该函数应该在每个执行器上为他们自己的 rdd 单独执行。
我该怎么做
更新(添加代码)
我 运行 这一切都在火花 shell
import org.apache.spark.sql.cassandra.CassandraSQLContext
import java.util.Properties
val cc = new CassandraSQLContext(sc)
val rdd = cc.sql("select * from sams.events where appname = 'test'");
val df = rdd.select("appname", "assetname");
这里我有一个 400 行的 df。我需要将此 df 保存到 sql 服务器 table。当我尝试使用 df.write 方法时,它给了我错误,我已经在一个单独的线程中发布了这些错误
spark dataframe not appending to the table
我可以打开 driverManager 连接并插入行,但这将在 spark 的驱动程序模块中完成
import java.sql._
import com.microsoft.sqlserver.jdbc.SQLServerDriver
// create a Statement from the connection
Statement statement = conn.createStatement();
// insert the data
statement.executeUpdate("INSERT INTO Customers " + "VALUES (1001, 'Simpson', 'Mr.', 'Springfield', 2001)");
String connectionUrl = "jdbc:sqlserver://localhost:1433;" +
"databaseName=AdventureWorks;user=MyUserName;password=*****;";
Connection con = DriverManager.getConnection(connectionUrl);
我需要在执行机上写这个。我怎样才能做到这一点?
为了设置从 worker 到其他系统的连接,我们应该使用 rdd.foreachPartitions(iter => ...)
foreachPartitions
允许您对每个分区执行一个操作,让您可以像本地迭代器一样访问分区的数据。
如果每个分区有足够的数据,设置资源(如数据库连接)的时间可以通过在整个分区上使用这些资源来分摊。
摘要,例如
rdd.foreachPartition(iter =>
//setup db connection
val dbconn = Driver.connect(ip, port)
iter.foreach{element =>
val query = makeQuery(element)
dbconn.execute(query)
}
dbconn.close
}
还可以创建单例资源管理器来为集群的每个 JVM 管理这些资源。有关此类本地资源管理器的完整示例,另请参阅此答案:
我有一个 rdd,它分布在 spark 环境中的多台机器上。我想在这个 rdd 上的每台工作机器上执行一个函数。 我不想收集 rdd 然后在驱动程序上执行函数。该函数应该在每个执行器上为他们自己的 rdd 单独执行。 我该怎么做
更新(添加代码) 我 运行 这一切都在火花 shell
import org.apache.spark.sql.cassandra.CassandraSQLContext
import java.util.Properties
val cc = new CassandraSQLContext(sc)
val rdd = cc.sql("select * from sams.events where appname = 'test'");
val df = rdd.select("appname", "assetname");
这里我有一个 400 行的 df。我需要将此 df 保存到 sql 服务器 table。当我尝试使用 df.write 方法时,它给了我错误,我已经在一个单独的线程中发布了这些错误 spark dataframe not appending to the table
我可以打开 driverManager 连接并插入行,但这将在 spark 的驱动程序模块中完成
import java.sql._
import com.microsoft.sqlserver.jdbc.SQLServerDriver
// create a Statement from the connection
Statement statement = conn.createStatement();
// insert the data
statement.executeUpdate("INSERT INTO Customers " + "VALUES (1001, 'Simpson', 'Mr.', 'Springfield', 2001)");
String connectionUrl = "jdbc:sqlserver://localhost:1433;" +
"databaseName=AdventureWorks;user=MyUserName;password=*****;";
Connection con = DriverManager.getConnection(connectionUrl);
我需要在执行机上写这个。我怎样才能做到这一点?
为了设置从 worker 到其他系统的连接,我们应该使用 rdd.foreachPartitions(iter => ...)
foreachPartitions
允许您对每个分区执行一个操作,让您可以像本地迭代器一样访问分区的数据。
如果每个分区有足够的数据,设置资源(如数据库连接)的时间可以通过在整个分区上使用这些资源来分摊。
摘要,例如
rdd.foreachPartition(iter =>
//setup db connection
val dbconn = Driver.connect(ip, port)
iter.foreach{element =>
val query = makeQuery(element)
dbconn.execute(query)
}
dbconn.close
}
还可以创建单例资源管理器来为集群的每个 JVM 管理这些资源。有关此类本地资源管理器的完整示例,另请参阅此答案: