Spark:使用 Stratio 和 RDD 查询 Mongodb

Spark: querying Mongodb with Stratio and RDD

我正在使用 Stratio (0.11.) 通过 Spark 查询 MongoDB。我有兴趣使用 RDD(无 DataFrame)。

我现在正在做的是:

val mongoRDD = new MongodbRDD(sqlContext, readConfig, new MongodbPartitioner(readConfig))
mongoRDD.foreach(println)

并且它以正确的方式显示 collection 内容。

有没有办法将查询(如 String 或通过 QueryBuilder 构建)与 Stratio(在我的例子中查询是 $near 类型)一起应用于 MongodbRDD

正如@zero323 所暗示的,方法是使用 filters 参数。这些过滤器由库检查并与 MongoDB QueryBuilder 可用过滤器相匹配。

来自 Spark-MongoDB source code:

sFilters.foreach {
    case EqualTo(attribute, value) =>
      queryBuilder.put(attribute).is(checkObjectID(attribute, value))
    case GreaterThan(attribute, value) =>
      queryBuilder.put(attribute).greaterThan(checkObjectID(attribute, value))
    case GreaterThanOrEqual(attribute, value) =>
      queryBuilder.put(attribute).greaterThanEquals(checkObjectID(attribute, value))
    case In(attribute, values) =>
      queryBuilder.put(attribute).in(values.map(value => checkObjectID(attribute, value)))
    case LessThan(attribute, value) =>
      queryBuilder.put(attribute).lessThan(checkObjectID(attribute, value))
    case LessThanOrEqual(attribute, value) =>
      queryBuilder.put(attribute).lessThanEquals(checkObjectID(attribute, value))
    case IsNull(attribute) =>
      queryBuilder.put(attribute).is(null)
    case IsNotNull(attribute) =>
      queryBuilder.put(attribute).notEquals(null)
    case And(leftFilter, rightFilter) if !parentFilterIsNot =>
      queryBuilder.and(filtersToDBObject(Array(leftFilter)), filtersToDBObject(Array(rightFilter)))
    case Or(leftFilter, rightFilter)  if !parentFilterIsNot =>
      queryBuilder.or(filtersToDBObject(Array(leftFilter)), filtersToDBObject(Array(rightFilter)))
    case StringStartsWith(attribute, value) if !parentFilterIsNot =>
      queryBuilder.put(attribute).regex(Pattern.compile("^" + value + ".*$"))
    case StringEndsWith(attribute, value) if !parentFilterIsNot =>
      queryBuilder.put(attribute).regex(Pattern.compile("^.*" + value + "$"))
    case StringContains(attribute, value) if !parentFilterIsNot =>
      queryBuilder.put(attribute).regex(Pattern.compile(".*" + value + ".*"))
    case Not(filter) =>
      filtersToDBObject(Array(filter), true)
  }

如您所见,near 未被应用,但它似乎可以很容易地添加到连接器功能中,因为 QueryBuilder offers methods to use that MongoDB function.

您可以尝试修改连接器。不过我会在接下来的几天尝试实现它并进行 PR。

编辑:

A PR has been opened 包括描述 $near 的源过滤器类型,因此您可以将 MongodbRdd 用作:

val mongoRDD = new MongodbRDD(
    sqlContext,
    readConfig,
    new MongodbPartitioner(readConfig),
    filters = FilterSection(Array(Near("x", 3.0, 4.0))))
)