将键值数据库与 Spark 集成
Integrate key-value database with Spark
我无法理解 Spark 如何与存储交互。
我想制作一个从 RocksDB 数据库(或任何其他键值存储)获取数据的 Spark 集群。然而,此时此刻,我能做的最好的事情就是将整个数据集从数据库中提取到每个集群节点的内存中(例如到地图中)并从该对象构建一个 RDD。
我需要做什么才能只获取必要的数据(就像 Spark 对 HDFS 所做的那样)?我已经阅读了有关 Hadoop 输入格式和记录读取器的内容,但我并没有完全掌握我应该实现的内容。
我知道这是一个宽泛的问题,但我真的很感激能帮助我开始。提前谢谢你。
这是一种可能的解决方案。我假设您有要访问的键值存储(在您的情况下为 RocksDB)的客户端库。
KeyValuePair
表示一个 bean class 表示键值存储中的一个键值对。
类
/*Lazy iterator to read from KeyValue store*/
class KeyValueIterator implements Iterator<KeyValuePair> {
public KeyValueIterator() {
//TODO initialize your custom reader using java client library
}
@Override
public boolean hasNext() {
//TODO
}
@Override
public KeyValuePair next() {
//TODO
}
}
class KeyValueReader implements FlatMapFunction<KeyValuePair, KeyValuePair>() {
@Override
public Iterator<KeyValuePair> call(KeyValuePair keyValuePair) throws Exception {
//ignore empty 'keyValuePair' object
return new KeyValueIterator();
}
}
创建键值 RDD
/*list with a dummy KeyValuePair instance*/
ArrayList<KeyValuePair> keyValuePairs = new ArrayList<>();
keyValuePairs.add(new KeyValuePair());
JavaRDD<KeyValuePair> keyValuePairRDD = javaSparkContext.parallelize(keyValuePairs);
/*Read one key-value pair at a time lazily*/
keyValuePairRDD = keyValuePairRDD.flatMap(new KeyValueReader());
注:
上面的解决方案默认创建了一个有两个分区的RDD(其中一个将是空的)。在 keyValuePairRDD
上应用任何转换之前增加分区以在执行程序之间分配处理。
增加分区的不同方法:
keyValuePairRDD.repartition(partitionCounts)
//OR
keyValuePairRDD.partitionBy(...)
我无法理解 Spark 如何与存储交互。
我想制作一个从 RocksDB 数据库(或任何其他键值存储)获取数据的 Spark 集群。然而,此时此刻,我能做的最好的事情就是将整个数据集从数据库中提取到每个集群节点的内存中(例如到地图中)并从该对象构建一个 RDD。
我需要做什么才能只获取必要的数据(就像 Spark 对 HDFS 所做的那样)?我已经阅读了有关 Hadoop 输入格式和记录读取器的内容,但我并没有完全掌握我应该实现的内容。
我知道这是一个宽泛的问题,但我真的很感激能帮助我开始。提前谢谢你。
这是一种可能的解决方案。我假设您有要访问的键值存储(在您的情况下为 RocksDB)的客户端库。
KeyValuePair
表示一个 bean class 表示键值存储中的一个键值对。
类
/*Lazy iterator to read from KeyValue store*/
class KeyValueIterator implements Iterator<KeyValuePair> {
public KeyValueIterator() {
//TODO initialize your custom reader using java client library
}
@Override
public boolean hasNext() {
//TODO
}
@Override
public KeyValuePair next() {
//TODO
}
}
class KeyValueReader implements FlatMapFunction<KeyValuePair, KeyValuePair>() {
@Override
public Iterator<KeyValuePair> call(KeyValuePair keyValuePair) throws Exception {
//ignore empty 'keyValuePair' object
return new KeyValueIterator();
}
}
创建键值 RDD
/*list with a dummy KeyValuePair instance*/
ArrayList<KeyValuePair> keyValuePairs = new ArrayList<>();
keyValuePairs.add(new KeyValuePair());
JavaRDD<KeyValuePair> keyValuePairRDD = javaSparkContext.parallelize(keyValuePairs);
/*Read one key-value pair at a time lazily*/
keyValuePairRDD = keyValuePairRDD.flatMap(new KeyValueReader());
注:
上面的解决方案默认创建了一个有两个分区的RDD(其中一个将是空的)。在 keyValuePairRDD
上应用任何转换之前增加分区以在执行程序之间分配处理。
增加分区的不同方法:
keyValuePairRDD.repartition(partitionCounts)
//OR
keyValuePairRDD.partitionBy(...)