在 Sparklyr 中创建新的 Spark 表或数据框的最有效方法是什么?

What is the most efficient way to create new Spark Tables or Data Frames in Sparklyr?

在 Hadoop 集群(而非 VM)上使用 sparklyr 包,我正在处理需要加入、过滤等的几种类型的 tables...我正在尝试确定使用 dplyr 命令以及 sparklyr 到 运行 处理中的数据管理功能、将其存储在缓存中并与中间数据对象以生成保留在缓存中的下游对象。上面提出的这个问题很肤浅,但我希望获得比纯粹效率更多的信息,所以如果你想编辑我的问题,我可以接受...

我在 Hive 中有一些 table,我们称它们为 Activity2016Accounts2016Accounts2017。 "Accounts" table 还包括地址历史记录。我想从 2016 年的数据开始,合并名称和当前地址上的两个 table,筛选一些 activity 和帐户详细信息,然后将两种不同的方式与 2017 年的帐户信息合并,特别是统计留在他们地址的人与改变地址的人。我们有数百万行,所以我们正在使用我们的 spark 集群 activity。

所以,首先,这就是我现在正在做的事情:

sc <- spark_connect()    

Activity2016 %>% filter(COL1 < Cut1 & COL1 > Cut2) %>% 
select(NAME,ADDRESS1) %>% 
inner_join(Accounts2016,c("NAME"="NAME","ADDRESS1"="ADDRESS1")) %>%
distinct(NAME,ADDRESS1) %>% sdf_register("JOIN2016")

tbl_cache(sc,"JOIN2016")
JOINED_2016 <- tbl(sc, "JOIN2016")

Acct2017 = tbl(sc, "HiveDB.Accounts2017")

# Now, I can run:
JOINED_2016 %>% inner_join(Acct2017,c("NAME"="NAME","ADDRESS1"="ADDRESS2")) %>%
distinct(NAME,ADDRESS1.x) %>% sdf_register("JOIN2017")

# Rinse & Repeat
tbl_cache(sc,"JOIN2017")
JOINED_2017 <- tbl(sc,"JOIN2017")

然后我继续使用 JOINED_2016JOINED_2017,使用 dplyr 动词等...

这里似乎有很多低效的地方……比如, 1) 难道我不能直接将它发送到缓存并将其作为变量调用吗? 2) 我不应该也可以将它直接发送到书面 Hive table 中吗? 3) 如何将最终对象转换为 运行 基本 R 命令,如 table(JOINED_2016$COL1) 或者那些不可用的命令(我在尝试 %>% select(COL1) %>% table 时遇到错误)?

如果下游有错误,我会丢失数据,我不会写...但是我觉得关于如何写入数据的选择太多了,我不太清楚。它什么时候最终成为缓存对象,相对于 RDD,相对于 Hive internal/external table,相对于 Spark DataFrame 以及每个 R 处理这些数据的能力有哪些限制对象?

例如,如果我只是 运行:

JOIN2016 <- Activity2016 %>% filter(COL1 < Cut1 & COL1 > Cut2) %>% 
select(NAME,ADDRESS1) %>% 
inner_join(Accounts2016,c("NAME"="NAME","ADDRESS1"="ADDRESS1")) %>%
distinct(NAME,ADDRESS1) 

这会是 R data.frame 吗? (这可能会使我的网关节点的 RAM 崩溃......这就是为什么我不愿意尝试它。这是业务集群)

所以总结一下: 我应该为 tbltbl_cache 命令烦恼还是需要它们?

我应该使用 dbWriteTable 吗?我可以在 sdf_register 之后、之前或代替 sdf_register 直接使用吗?或者我需要在我之前使用 tbl 命令吗?可以向 Hive 写入任何内容吗? sdf_register 几乎没有意义。

我应该使用 copy_to 还是 db_copy_to 而不是 dbWriteTable?我不想把 Hive 变成垃圾场,所以我想小心我如何编写中间数据,然后在我存储它之后 R 将如何使用它保持一致。

我必须 运行 使用这些 data.frame 类型中的哪些类型来处理数据,就像它是内存中的 R 对象一样,还是我只能使用 dplyr 命令?

抱歉,这个问题太多了,但我觉得 R-bloggers 文章、sparklyr 教程以及 SOF 上的其他问题都没有明确说明这些问题。

sdf_register 在处理长 运行 查询时不是很有用。它基本上是一个非物化视图,这意味着它会在您每次调用它时运行底层查询。添加以下内容会将数据作为 table 写入 Hive。

spark_dataframe %>% invoke("write") %>% invoke("saveAsTable", as.character("your_desired_table_name"))

这将 saveAsTable 用作 table,这将创建一个 table 并保留 table,即使在 Spark 会话结束后也是如此。使用 createOrReplaceTempView 不会在 Spark 会话结束时保留数据。