根据 Scala flink 中的另一个数据集过滤数据集
Filter a DataSet in terms of another DataSet in Scala flink
我正在尝试复制此 python 代码:
cond_entropy_x = np.array([entropy(x[y == v]) for v in uy])
其中x
和y
是向量,uy
是y
上的唯一值,例如0,1
.
在flink中,我有:
val uy = y.distinct.collect
val condHx = for (i ← uy)
yield entropy(x.filterWithBcVariable(y)((_, yy) ⇒ yy == i))
不过,似乎 filterWithBcVariable
并没有取 y
上的每个值,它只取第一个。
我也试过:
for (i ← values) yield y.join(x).where(a ⇒ a).equalTo(_ ⇒ i)
但是我运行内存不足。
如何根据 y
上的值过滤 x
?
x.zip(y)
之类的东西可以,但不受支持。
有什么想法吗?
我想出了一个解决方案,可能不是最好的,但至少它有效。
现在,我没有将 x
和 y
分开传递 DataSets
,而是传递了仅包含一列的 DataSet[LabeledVector]
:
val xy = input.map(lv ⇒ LabeledVector(lv.label, DenseVector(lv.vector(0))))
然后我将 xy
传递给我的函数:
def conditionalEntropy(xy: DataSet[LabeledVector]): Double = {
// Get the label
val y = xy map (_.label)
// Get probs for the label
val p = probs(y).toArray.asBreeze
// Get unique values in label
val values = y.distinct.collect
// Compute Conditional Entropy
val condH = for (i ← values)
yield entropy(xy.filter(_.label == i))
p.dot(seq2Breeze(condH))
}
我正在尝试复制此 python 代码:
cond_entropy_x = np.array([entropy(x[y == v]) for v in uy])
其中x
和y
是向量,uy
是y
上的唯一值,例如0,1
.
在flink中,我有:
val uy = y.distinct.collect
val condHx = for (i ← uy)
yield entropy(x.filterWithBcVariable(y)((_, yy) ⇒ yy == i))
不过,似乎 filterWithBcVariable
并没有取 y
上的每个值,它只取第一个。
我也试过:
for (i ← values) yield y.join(x).where(a ⇒ a).equalTo(_ ⇒ i)
但是我运行内存不足。
如何根据 y
上的值过滤 x
?
x.zip(y)
之类的东西可以,但不受支持。
有什么想法吗?
我想出了一个解决方案,可能不是最好的,但至少它有效。
现在,我没有将 x
和 y
分开传递 DataSets
,而是传递了仅包含一列的 DataSet[LabeledVector]
:
val xy = input.map(lv ⇒ LabeledVector(lv.label, DenseVector(lv.vector(0))))
然后我将 xy
传递给我的函数:
def conditionalEntropy(xy: DataSet[LabeledVector]): Double = {
// Get the label
val y = xy map (_.label)
// Get probs for the label
val p = probs(y).toArray.asBreeze
// Get unique values in label
val values = y.distinct.collect
// Compute Conditional Entropy
val condH = for (i ← values)
yield entropy(xy.filter(_.label == i))
p.dot(seq2Breeze(condH))
}