PHOENIX SPARK - 将 Table 加载为 DataFrame
PHOENIX SPARK - Load Table as DataFrame
我已经从具有 5 亿行的 HBase Table (PHOENIX) 创建了一个 DataFrame。我从 DataFrame 创建了一个 JavaBean 的 RDD,并将其用于连接文件中的数据。
Map<String, String> phoenixInfoMap = new HashMap<String, String>();
phoenixInfoMap.put("table", tableName);
phoenixInfoMap.put("zkUrl", zkURL);
DataFrame df = sqlContext.read().format("org.apache.phoenix.spark").options(phoenixInfoMap).load();
JavaRDD<Row> tableRows = df.toJavaRDD();
JavaPairRDD<String, AccountModel> dbData = tableRows.mapToPair(
new PairFunction<Row, String, String>()
{
@Override
public Tuple2<String, String> call(Row row) throws Exception
{
return new Tuple2<String, String>(row.getAs("ID"), row.getAs("NAME"));
}
});
现在我的问题 - 假设该文件有 2 个独特的百万条目与 table 匹配。是将整个 table 作为 RDD 加载到内存中,还是仅将 table 中匹配的 200 万条记录作为 RDD 加载到内存中?
你的陈述
DataFrame df = sqlContext.read().format("org.apache.phoenix.spark").options(phoenixInfoMap)
.load();
会将整个 table 加载到内存中。您没有为 phoenix 提供任何过滤器以向下推送到 hbase - 从而减少读取的行数。
如果您连接到非 HBase 数据源 - 例如平面文件 - 那么首先需要读入来自 hbase table 的所有记录。记录与辅助数据不匹配source 不会保存在新的 DataFrame 中——但初始读取仍然会发生。
更新 一种可能的方法是预处理文件 - 即提取您想要的 ID。将结果存储到一个新的 HBase table。然后通过 Phoenix 而不是 Spark 直接在 HBase 中执行连接。
该方法的基本原理是将计算移至数据。大部分数据驻留在 HBase 中——因此将小数据(文件中的 ID)移动到那里。
我不直接熟悉 Phoenix,只是它在 hbase 之上提供了一个 sql 层。据推测,它将能够进行这样的连接并将结果存储在单独的 HBase table ..?然后可以将单独的 table 加载到 Spark 中以用于后续计算。
我已经从具有 5 亿行的 HBase Table (PHOENIX) 创建了一个 DataFrame。我从 DataFrame 创建了一个 JavaBean 的 RDD,并将其用于连接文件中的数据。
Map<String, String> phoenixInfoMap = new HashMap<String, String>();
phoenixInfoMap.put("table", tableName);
phoenixInfoMap.put("zkUrl", zkURL);
DataFrame df = sqlContext.read().format("org.apache.phoenix.spark").options(phoenixInfoMap).load();
JavaRDD<Row> tableRows = df.toJavaRDD();
JavaPairRDD<String, AccountModel> dbData = tableRows.mapToPair(
new PairFunction<Row, String, String>()
{
@Override
public Tuple2<String, String> call(Row row) throws Exception
{
return new Tuple2<String, String>(row.getAs("ID"), row.getAs("NAME"));
}
});
现在我的问题 - 假设该文件有 2 个独特的百万条目与 table 匹配。是将整个 table 作为 RDD 加载到内存中,还是仅将 table 中匹配的 200 万条记录作为 RDD 加载到内存中?
你的陈述
DataFrame df = sqlContext.read().format("org.apache.phoenix.spark").options(phoenixInfoMap)
.load();
会将整个 table 加载到内存中。您没有为 phoenix 提供任何过滤器以向下推送到 hbase - 从而减少读取的行数。
如果您连接到非 HBase 数据源 - 例如平面文件 - 那么首先需要读入来自 hbase table 的所有记录。记录与辅助数据不匹配source 不会保存在新的 DataFrame 中——但初始读取仍然会发生。
更新 一种可能的方法是预处理文件 - 即提取您想要的 ID。将结果存储到一个新的 HBase table。然后通过 Phoenix 而不是 Spark 直接在 HBase 中执行连接。
该方法的基本原理是将计算移至数据。大部分数据驻留在 HBase 中——因此将小数据(文件中的 ID)移动到那里。
我不直接熟悉 Phoenix,只是它在 hbase 之上提供了一个 sql 层。据推测,它将能够进行这样的连接并将结果存储在单独的 HBase table ..?然后可以将单独的 table 加载到 Spark 中以用于后续计算。