具有查找 table 的批处理作业 (Spark) 太大而无法放入内存
Batch processing job (Spark) with lookup table that's too big to fit into memory
我正在尝试编写一个批处理作业来处理目前位于 HBase 数据库(在 AWS 的 EMR 集群中)的数百 TB 数据,所有这些都在一个大的 table 中。对于我正在处理的每一行,我需要从第二个 HBase table 中的查找 table(一个简单的整数到字符串映射)中获取额外数据。我们每行要进行 5-10 次查找。
我当前的实现使用一个 Spark 作业,该作业将输入 table 的分区分配给它的工作人员,形状如下:
Configuration hBaseConfig = newHBaseConfig();
hBaseConfig.set(TableInputFormat.SCAN, convertScanToString(scan));
hBaseConfig.set(TableInputFormat.INPUT_TABLE, tableName);
JavaPairRDD<ImmutableBytesWritable, Result> table = sparkContext.newAPIHadoopRDD(hBaseConfig, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);
table.map(val -> {
// some preprocessing
}).foreachPartition(p -> {
p.forEachRemaining(row -> {
// code that does the lookup
});
});
问题是查找 table 太大,工作人员的内存无法容纳。他们都需要访问查找的所有部分 table,但他们的访问模式将从缓存中受益匪浅。
我认为我不能将简单的地图用作 broadcast variable 因为它需要装入内存,这对吗?
Spark 使用无共享架构,所以我想没有一种简单的方法可以在所有 worker 之间共享缓存,但是我们可以为每个 worker 构建一个简单的 LRU 缓存吗?
我如何实现这样一个本地工作缓存,以便在缓存未命中时从 HBase 中的查找 table 中获取数据?我可以以某种方式向所有工作人员分发对第二个 table 的引用吗?
除了将 HBase 作为数据源之外,我还没有确定我的技术选择。有没有比 Spark 更适合我的用例的框架?
你有几个选项来处理这个要求:
1- 使用 RDD 或数据集连接
您可以将两个 HBase 表加载为 Spark RDD 或数据集,然后对查找键执行 join
。
Spark 会将两个 RDD 分成多个分区并随机播放内容,以便具有相同键的行最终出现在相同的执行器上。
通过管理 spark 中的分区数量,您应该能够连接任意大小的 2 个表。
2-广播解析器实例
您可以广播执行 HBase 查找和临时 LRU 缓存的解析器实例,而不是广播映射。每个执行者都将获得此实例的副本,并可以管理自己的缓存,您可以在 foreachPartition()
代码中调用它们。
注意,解析器实例需要实现可序列化,因此您必须将缓存、HBase 连接和 HBase 配置属性声明为 transient,以便在每个执行程序上初始化。
我 运行 在我维护的项目之一的 Scala 中进行了这样的设置:如果您知道您的访问模式并有效地管理缓存,它可以工作并且比直接 Spark 连接更有效
3- 使用 HBase Spark 连接器实现查找逻辑
Apache HBase 最近合并了改进的 HBase Spark connectors
目前文档非常稀少,您需要查看 JIRA 票证和这些工具的前身文档
Cloudera's SparkOnHBase but the last unit test in the test suite 看起来很像你想要的
虽然我没有这方面的经验API。
我正在尝试编写一个批处理作业来处理目前位于 HBase 数据库(在 AWS 的 EMR 集群中)的数百 TB 数据,所有这些都在一个大的 table 中。对于我正在处理的每一行,我需要从第二个 HBase table 中的查找 table(一个简单的整数到字符串映射)中获取额外数据。我们每行要进行 5-10 次查找。
我当前的实现使用一个 Spark 作业,该作业将输入 table 的分区分配给它的工作人员,形状如下:
Configuration hBaseConfig = newHBaseConfig();
hBaseConfig.set(TableInputFormat.SCAN, convertScanToString(scan));
hBaseConfig.set(TableInputFormat.INPUT_TABLE, tableName);
JavaPairRDD<ImmutableBytesWritable, Result> table = sparkContext.newAPIHadoopRDD(hBaseConfig, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);
table.map(val -> {
// some preprocessing
}).foreachPartition(p -> {
p.forEachRemaining(row -> {
// code that does the lookup
});
});
问题是查找 table 太大,工作人员的内存无法容纳。他们都需要访问查找的所有部分 table,但他们的访问模式将从缓存中受益匪浅。
我认为我不能将简单的地图用作 broadcast variable 因为它需要装入内存,这对吗?
Spark 使用无共享架构,所以我想没有一种简单的方法可以在所有 worker 之间共享缓存,但是我们可以为每个 worker 构建一个简单的 LRU 缓存吗?
我如何实现这样一个本地工作缓存,以便在缓存未命中时从 HBase 中的查找 table 中获取数据?我可以以某种方式向所有工作人员分发对第二个 table 的引用吗?
除了将 HBase 作为数据源之外,我还没有确定我的技术选择。有没有比 Spark 更适合我的用例的框架?
你有几个选项来处理这个要求:
1- 使用 RDD 或数据集连接
您可以将两个 HBase 表加载为 Spark RDD 或数据集,然后对查找键执行 join
。
Spark 会将两个 RDD 分成多个分区并随机播放内容,以便具有相同键的行最终出现在相同的执行器上。
通过管理 spark 中的分区数量,您应该能够连接任意大小的 2 个表。
2-广播解析器实例
您可以广播执行 HBase 查找和临时 LRU 缓存的解析器实例,而不是广播映射。每个执行者都将获得此实例的副本,并可以管理自己的缓存,您可以在 foreachPartition()
代码中调用它们。
注意,解析器实例需要实现可序列化,因此您必须将缓存、HBase 连接和 HBase 配置属性声明为 transient,以便在每个执行程序上初始化。
我 运行 在我维护的项目之一的 Scala 中进行了这样的设置:如果您知道您的访问模式并有效地管理缓存,它可以工作并且比直接 Spark 连接更有效
3- 使用 HBase Spark 连接器实现查找逻辑
Apache HBase 最近合并了改进的 HBase Spark connectors 目前文档非常稀少,您需要查看 JIRA 票证和这些工具的前身文档 Cloudera's SparkOnHBase but the last unit test in the test suite 看起来很像你想要的
虽然我没有这方面的经验API。