将字符串特征转换为数字特征:算法效率
Converting string features to numeric features: algorithm efficiency
我正在将几列字符串转换为我可以在 LabeledPoint
中使用的数字特征。我正在考虑两种方法:
- 创建字符串到双精度数的映射,遍历 RDD 并查找每个字符串并分配适当的值。
- 按列对RDD进行排序,用计数器遍历RDD,将每个字符串分配给当前计数器值,直到字符串发生变化,此时计数器值递增并赋值。由于我们从来没有看到一个字符串两次(由于排序),这将有效地为每个字符串分配一个唯一的值。
在第一种方法中,我们必须为地图收集唯一值。我不确定这需要多长时间(线性时间?)。然后我们遍历值列表并构建一个 HashMap - 线性时间和内存。最后我们迭代并查找每个值,N * eC(有效常数时间)。
在第二种方法中,我们排序(n log n 次)然后迭代并跟踪一个简单的计数器和一些变量。
推荐什么方法?有内存、性能和编码风格方面的考虑。第一感觉像 2N + eC * N 和 N * (String, Double) 内存,可以写成函数式风格。第二个是 N log N + N 但感觉势在必行。 Spark 需要传输静态地图吗?我可以看到这是一个交易破坏者。
不幸的是,第二种方法不起作用,原因是您无法读取表格计数器,您只能递增它。更糟糕的是,你真的不知道什么时候值发生变化,你没有状态来记住以前的向量。我猜你可以使用像 mapPartition 和 total order partitioner 这样的东西。您必须知道您的分区是按顺序处理的,并且在多个分区中不能有相同的密钥,但这感觉真的很糟糕(而且我不知道它是否可行)。
我认为不可能一次完成。但是你可以一分为二。在您的第一种方法中,您可以使用例如 set accumulator 将所有值放入其中,然后在驱动程序中对它们进行编号,并在第二遍中使用它们来替换它们。复杂度为 2N(假设值的数量 << N)。
编辑:
implicit object SetAcc extends AccumulatorParam[Set[String]] {
def zero(s: Set[String]) = Set()
def addInPlace(s1: Set[String], s2: Set[String]) = s1 ++ s2
}
val rdd = sc.parallelize(
List((1, "a"), (2, "a"), (3, "b"), (4, "a"), (5, "c"), (6, "b"))
)
val acc: Accumulator[Set[String]] = sc.accumulator(Set())
rdd.foreach(p => acc += Set(p._2))
val encoding = acc.value.zipWithIndex.toMap
val result = rdd map {p => (p._1, encoding(p._2))}
如果你觉得这本词典太大了,当然可以转发。如果你有很多特征和值,并且你不想创建那么多大的累加器,那么你可以使用 reduce 函数将它们一起处理并收集驱动程序。只是我的想法。我猜你只需要尝试看看什么最适合你的用例。
编辑:
在 mllib 中有 class 用于此目的 HashingTF
。它允许您一次翻译您的数据集。缺点是它使用散列模指定参数将对象映射到双精度。如果参数太小,这可能会导致冲突。
val tf = new HashingTF(numFeatures = 10000)
val transformed = data.map(line => tf.transform(line.split("""\s+"""))
当然,您可以在不使用 HashingTF
class.
的情况下手动完成相同的操作
我正在将几列字符串转换为我可以在 LabeledPoint
中使用的数字特征。我正在考虑两种方法:
- 创建字符串到双精度数的映射,遍历 RDD 并查找每个字符串并分配适当的值。
- 按列对RDD进行排序,用计数器遍历RDD,将每个字符串分配给当前计数器值,直到字符串发生变化,此时计数器值递增并赋值。由于我们从来没有看到一个字符串两次(由于排序),这将有效地为每个字符串分配一个唯一的值。
在第一种方法中,我们必须为地图收集唯一值。我不确定这需要多长时间(线性时间?)。然后我们遍历值列表并构建一个 HashMap - 线性时间和内存。最后我们迭代并查找每个值,N * eC(有效常数时间)。
在第二种方法中,我们排序(n log n 次)然后迭代并跟踪一个简单的计数器和一些变量。
推荐什么方法?有内存、性能和编码风格方面的考虑。第一感觉像 2N + eC * N 和 N * (String, Double) 内存,可以写成函数式风格。第二个是 N log N + N 但感觉势在必行。 Spark 需要传输静态地图吗?我可以看到这是一个交易破坏者。
不幸的是,第二种方法不起作用,原因是您无法读取表格计数器,您只能递增它。更糟糕的是,你真的不知道什么时候值发生变化,你没有状态来记住以前的向量。我猜你可以使用像 mapPartition 和 total order partitioner 这样的东西。您必须知道您的分区是按顺序处理的,并且在多个分区中不能有相同的密钥,但这感觉真的很糟糕(而且我不知道它是否可行)。
我认为不可能一次完成。但是你可以一分为二。在您的第一种方法中,您可以使用例如 set accumulator 将所有值放入其中,然后在驱动程序中对它们进行编号,并在第二遍中使用它们来替换它们。复杂度为 2N(假设值的数量 << N)。
编辑:
implicit object SetAcc extends AccumulatorParam[Set[String]] {
def zero(s: Set[String]) = Set()
def addInPlace(s1: Set[String], s2: Set[String]) = s1 ++ s2
}
val rdd = sc.parallelize(
List((1, "a"), (2, "a"), (3, "b"), (4, "a"), (5, "c"), (6, "b"))
)
val acc: Accumulator[Set[String]] = sc.accumulator(Set())
rdd.foreach(p => acc += Set(p._2))
val encoding = acc.value.zipWithIndex.toMap
val result = rdd map {p => (p._1, encoding(p._2))}
如果你觉得这本词典太大了,当然可以转发。如果你有很多特征和值,并且你不想创建那么多大的累加器,那么你可以使用 reduce 函数将它们一起处理并收集驱动程序。只是我的想法。我猜你只需要尝试看看什么最适合你的用例。
编辑:
在 mllib 中有 class 用于此目的 HashingTF
。它允许您一次翻译您的数据集。缺点是它使用散列模指定参数将对象映射到双精度。如果参数太小,这可能会导致冲突。
val tf = new HashingTF(numFeatures = 10000)
val transformed = data.map(line => tf.transform(line.split("""\s+"""))
当然,您可以在不使用 HashingTF
class.