根据 Scala flink 中的另一个数据集过滤数据集

Filter a DataSet in terms of another DataSet in Scala flink

我正在尝试复制此 python 代码:

cond_entropy_x = np.array([entropy(x[y == v]) for v in uy])

其中xy是向量,uyy上的唯一值,例如0,1.

在flink中,我有:

val uy = y.distinct.collect
val condHx = for (i ← uy)
    yield entropy(x.filterWithBcVariable(y)((_, yy) ⇒ yy == i))

不过,似乎 filterWithBcVariable 并没有取 y 上的每个值,它只取第一个。

我也试过:

for (i ← values) yield y.join(x).where(a ⇒ a).equalTo(_ ⇒ i)

但是我运行内存不足。

如何根据 y 上的值过滤 x

x.zip(y) 之类的东西可以,但不受支持。

有什么想法吗?

我想出了一个解决方案,可能不是最好的,但至少它有效。

现在,我没有将 xy 分开传递 DataSets,而是传递了仅包含一列的 DataSet[LabeledVector]

val xy = input.map(lv ⇒ LabeledVector(lv.label, DenseVector(lv.vector(0))))

然后我将 xy 传递给我的函数:

def conditionalEntropy(xy: DataSet[LabeledVector]): Double = {
    // Get the label
    val y = xy map (_.label)
    // Get probs for the label
    val p = probs(y).toArray.asBreeze
    // Get unique values in label
    val values = y.distinct.collect
    // Compute Conditional Entropy
    val condH = for (i ← values)
      yield entropy(xy.filter(_.label == i))
    p.dot(seq2Breeze(condH))
  }