在 Spark 中存储倒排索引

Store inverted index in Spark

我正在使用Spark创建一个倒排索引(或者更多"posting list",所以帖子的顺序很重要),看起来有点像这样

|   key   |  postings                                    |
----------------------------------------------------------
|   "a"   |  1, 3, 4, 7, 8, 9, 21, 25                    |
|   "b"   |  7, 12, 21, 24, 28, 31, 37, 48, 51, 91       |
|   "c"   |  1, 2, 3, 10, 12, 17, 21, 38, 39, 40, 47     |

注意关键是例如一个字符串,帖子是一个排序列表,例如整数。稍后我将使用帖子列表并对其进行多次迭代(希望非常有效)。

我想知道在 Spark 中创建这样的数据框并最终将其存储到 Parquet 的最佳选择是什么。你建议例如使用嵌套结构?或者更确切地说,为帖子使用数组(尽管如果我想存储的不仅仅是一个 id,而是一个 id 和一个距离,即整数和浮点数的元组,我应该怎么做)?或者您会建议根本不要使用这样的发布列表并采用扁平结构(例如,键,在多次出现相同键的地方发布)?

我会使用列表。收集像 IntegerType 这样的简单值列表将相当容易。像这样:

val df = Seq(
  ("a",1,1.1),("a",3,2.3),("a",4,1.0),("b",7,4.3),("b",12,11.11),("b",21,0.01)
).toDF("key","posting","distance")

val aggregatedDf1 = df.groupBy("key").agg(collect_list(col("posting")) as "postings")

在复杂的 StructType 上执行 collect_list 会更加困难,因为 Hive 聚合函数仅适用于简单类型。

要聚合 StructType,您需要创建一个 UDAFUDAF API 有点繁琐,所以你可以稍微作弊,将列聚合成两个列表,然后使用简单的 UDFzip 这两个列表,像这样:

val zipper = udf[Seq[Tuple2[Int,Double]],Seq[Int],Seq[Double]]((a,b) => a.zip(b))

val aggregatedDf2 = df.groupBy("key").agg(
  collect_list(col("posting")) as "postings",
  collect_list(col("distance")) as "distances"
).withColumn("postings", zipper($"postings", $"distances")).drop("distances")