将 SparkR DataFrame 序列化为 jobj

Serialize SparkR DataFrame to jobj

我希望能够在 SparkR SparkDataFrame 上使用 Java 方法将数据写入 Cassandra。

例如,使用 sparklyr 扩展,我可以这样做:

sparklyr::invoke(sparklyr::spark_dataframe(spark_tbl), "write") %>>% 
sparklyr::invoke("format", "org.apache.spark.sql.cassandra") %>>% 
sparklyr::invoke("option", "keyspace", keyspace) %>>% 
sparklyr::invoke("option", "table", table) %>>%
sparklyr::invoke("mode", "append") %>% 
sparklyr::invoke("save")

可以达到每秒约20k行的写入速度。

然而,对于我的用例,我希望能够使用 SparkR::spark.lapply 以便我可以在本地收集我的 Cassandra table 的子集,运行 在它们上面的脚本并将数据写回。我尝试使用 sparklyr 的每一种方法都以单线程结束,因此根本没有真正利用 spark。

使用 SparkR,我可以使用如下方式写入数据:

SparkR::saveDF(SparkR::as.DataFrame(dt_local), "",
               source = "org.apache.spark.sql.cassandra",
               table = table,
               keyspace = keyspace,
               mode = "append")

但是在这种情况下,写入速度接近每秒 2k 行。我想我可以使用 SparkR::sparkR.callJMethod 调用与 sparklyr 情况下相同的链来实现更高的写入速度,但是我首先需要序列化 ​​SparkDataFrame 这样有一个 jobj 的句柄,我没能做到。这可能吗?

如果可能的话,我也愿意接受任何其他实现此目的的方法。我调查了试图在 sparkRsparklyr 之间移动,但似乎后端差异太大(据我所知)。我还相信 here 目前还没有 sparklyr 的类似 lapply

感谢您的帮助

长话短说这是不可能的。 Apache Spark 不支持,而且很可能永远不会支持嵌套并行化操作。这与特定的后端无关。您可以尝试对本机 R 客户端(dbConnectRCassandra)使用 SparkR::*apply 方法。

您可以访问 JVM 对象:

SparkR::as.DataFrame(dt_local)@sdf

但它根本不能在驱动程序节点之外使用。