Clojure:分组太慢(1300 万行文件)
Clojure : Group-by too slow (13 million-lines file)
情况
我有一个 1300 万行的 CSV,我想在其中对每个组执行逻辑回归(incanter)。
我的文件是这样的(值只是示例)
ID Max Probability
1 1 0.5
1 5 0.6
1 10 0.99
2 1 0.1
2 7 0.95
所以我先用csv读了一下-reader,一切都很好。
然后我有类似的东西:
( {"Id" "1", "Max" 1, "Probability" 0.5} {"Id" "1", "Max" 5, "Probability" 0.6} etc.
我想按Id对这些值进行分组,如果我没记错的话,大约有120万个Id。 (我在 Python 中用 pandas 做到了,速度非常快)
这是我读取和格式化文件的函数(它在较小的数据集上工作正常):
(defn read-file
[]
(let [path (:path-file @config)
content-csv (take-csv path \,)]
(->> (group-by :Id content-csv)
(map (fn [[k v]]
[k {:x (mapv :Max v) :y (mapv :Probability v)}]))
(into {}))))
我最终希望有类似的东西来执行逻辑回归(我对此很灵活,不需要 :x 和 :y 的向量,seqs 没问题)
{"1" {:x [1 5 10] :y [0.5 0.6 0.99]} "2" {:x [1 7] :y [0.1 0.95]} etc.
问题
我的分组操作有问题。我在 CSV 的输出上单独尝试了它,并且由于 Java 堆 Space 内存而没有消失,这将永远持续下去。
我以为问题出在我的 mapv 上,但这是分组依据。
我考虑过使用 reduce 或 reduce-kv,但我不知道如何将这些函数用于此类目的。
我不关心“:x”和“:y”的顺序(只要它们相同,我的意思是 x 和 y 具有相同的索引...这不是问题,因为它们在同一行上)和最终结果的 ID,我读到该分组保持顺序。
也许这就是手术成本高昂的原因?
如果有人遇到过,我给你样本数据:
(def sample '({"Id" "1" "Max" 1 "Probability" 0.5} {"Id" "1" "Max" 5 "Probability" 0.6} {"Id" "1" "Max" 10 "Probability" 0.99} {"Id" "2" "Max" 1 "Probability" 0.1} {"Id" "2" "Max" 7 "Probability" 0.95}))
其他选择
我有其他想法,但我不确定它们是否 "Clojure" 友好。
在Python中,由于函数的性质和文件已经排序,我没有使用group-by,而是为每个数据帧编写了开始和结束索引分组以便我只需要直接 select 子数据选项卡。
我还可以加载 ID 列表,而不是从 Clojure 计算它。
喜欢
(def ids '("1" "2" 等等
所以也许可以从 :
开始
{"1" {:x [] :y []} "2" {:x [] :y []} etc.
从前面的seq开始,然后在每个ID上匹配大文件。
我不知道实际上它是否更有效率。
其他逻辑回归函数我都有,就是缺这部分!
谢谢!
编辑
感谢您的回答,我终于有了这个解决方案。
在我的 project.clj 文件中
:jvm-opts ["-Xmx13g"])
代码:
(defn data-group->map [group]
{(:Id (first group))
{:x (map :Max group)
:y (map :Probability group)}})
(defn prob-cumsum [data]
(cag/fmap
(fn [x]
(assoc x :y (reductions + (x :y))))
data))
(defn process-data-splitter [data]
(->> (partition-by :Id data)
(map data-group->map)
(into {})
(prob-cumsum)))
我包装了我所有的代码并且它有效。拆分大约需要 5 分钟,但我不需要超高速。内存使用量可以达到文件读取的所有内存,然后是 sigmoid。
如果您的文件按 ID 排序,您可以使用 partition-by
而不是 group-by
。
那么您的代码将如下所示:
(defn data-group->map [group]
[(:Id (first group))
{:x (mapv :Max group)
:y (mapv :Probability group)}])
(defn read-file []
(let [path (:path-file @config)
content-csv (take-csv path \,)]
(->> content-csv
(partition-by :Id)
(map data-group->map)
(into {}))))
这应该会加快速度。
然后你可以使用 transducer
让它更快
(defn read-file []
(let [path (:path-file @config)
content-csv (take-csv path \,)]
(into {} (comp (partition-by :Id)
(map data-group->map))
content-csv)))
让我们做一些测试:
先生成像你这样的海量数据:
(def huge-data
(doall (mapcat #(repeat
1000000
{:Id % :Max 1 :Probability 10})
(range 10))))
我们有千万个项目数据集,有百万个 {:Id 0 :Max 1 :Probability 10}
、百万个 {:Id 1 :Max 1 :Probability 10}
等等。
现待测试功能:
(defn process-data-group-by [data]
(->> (group-by :Id data)
(map (fn [[k v]]
[k {:x (mapv :Max v) :y (mapv :Probability v)}]))
(into {})))
(defn process-data-partition-by [data]
(->> data
(partition-by :Id)
(map data-group->map)
(into {})))
(defn process-data-transducer [data]
(into {} (comp (partition-by :Id) (map data-group->map)) data))
现在进行时间测试:
(do (time (dorun (process-data-group-by huge-data)))
(time (dorun (process-data-partition-by huge-data)))
(time (dorun (process-data-transducer huge-data))))
"Elapsed time: 3377.167645 msecs"
"Elapsed time: 3707.03448 msecs"
"Elapsed time: 1462.955152 msecs"
注意,partition-by
产生的是lazy sequence,而group-by要实现whole collection。因此,如果您需要一组一组的数据,而不是整个地图,您可以删除 (into {})
并更快地访问每个数据:
(defn process-data-partition-by [data]
(->> data
(partition-by :Id)
(map data-group->map)))
检查:
user> (time (def processed-data (process-data-partition-by huge-data)))
"Elapsed time: 0.06079 msecs"
#'user/processed-data
user> (time (let [f (first processed-data)]))
"Elapsed time: 302.200571 msecs"
nil
user> (time (let [f (second processed-data)]))
"Elapsed time: 500.597153 msecs"
nil
user> (time (let [f (last processed-data)]))
"Elapsed time: 2924.588625 msecs"
nil
user.core> (time (let [f (last processed-data)]))
"Elapsed time: 0.037646 msecs"
nil
情况
我有一个 1300 万行的 CSV,我想在其中对每个组执行逻辑回归(incanter)。 我的文件是这样的(值只是示例)
ID Max Probability
1 1 0.5
1 5 0.6
1 10 0.99
2 1 0.1
2 7 0.95
所以我先用csv读了一下-reader,一切都很好。
然后我有类似的东西:
( {"Id" "1", "Max" 1, "Probability" 0.5} {"Id" "1", "Max" 5, "Probability" 0.6} etc.
我想按Id对这些值进行分组,如果我没记错的话,大约有120万个Id。 (我在 Python 中用 pandas 做到了,速度非常快)
这是我读取和格式化文件的函数(它在较小的数据集上工作正常):
(defn read-file
[]
(let [path (:path-file @config)
content-csv (take-csv path \,)]
(->> (group-by :Id content-csv)
(map (fn [[k v]]
[k {:x (mapv :Max v) :y (mapv :Probability v)}]))
(into {}))))
我最终希望有类似的东西来执行逻辑回归(我对此很灵活,不需要 :x 和 :y 的向量,seqs 没问题)
{"1" {:x [1 5 10] :y [0.5 0.6 0.99]} "2" {:x [1 7] :y [0.1 0.95]} etc.
问题
我的分组操作有问题。我在 CSV 的输出上单独尝试了它,并且由于 Java 堆 Space 内存而没有消失,这将永远持续下去。 我以为问题出在我的 mapv 上,但这是分组依据。
我考虑过使用 reduce 或 reduce-kv,但我不知道如何将这些函数用于此类目的。
我不关心“:x”和“:y”的顺序(只要它们相同,我的意思是 x 和 y 具有相同的索引...这不是问题,因为它们在同一行上)和最终结果的 ID,我读到该分组保持顺序。 也许这就是手术成本高昂的原因?
如果有人遇到过,我给你样本数据:
(def sample '({"Id" "1" "Max" 1 "Probability" 0.5} {"Id" "1" "Max" 5 "Probability" 0.6} {"Id" "1" "Max" 10 "Probability" 0.99} {"Id" "2" "Max" 1 "Probability" 0.1} {"Id" "2" "Max" 7 "Probability" 0.95}))
其他选择
我有其他想法,但我不确定它们是否 "Clojure" 友好。
在Python中,由于函数的性质和文件已经排序,我没有使用group-by,而是为每个数据帧编写了开始和结束索引分组以便我只需要直接 select 子数据选项卡。
我还可以加载 ID 列表,而不是从 Clojure 计算它。 喜欢
(def ids '("1" "2" 等等
所以也许可以从 :
开始{"1" {:x [] :y []} "2" {:x [] :y []} etc.
从前面的seq开始,然后在每个ID上匹配大文件。
我不知道实际上它是否更有效率。
其他逻辑回归函数我都有,就是缺这部分! 谢谢!
编辑
感谢您的回答,我终于有了这个解决方案。
在我的 project.clj 文件中
:jvm-opts ["-Xmx13g"])
代码:
(defn data-group->map [group]
{(:Id (first group))
{:x (map :Max group)
:y (map :Probability group)}})
(defn prob-cumsum [data]
(cag/fmap
(fn [x]
(assoc x :y (reductions + (x :y))))
data))
(defn process-data-splitter [data]
(->> (partition-by :Id data)
(map data-group->map)
(into {})
(prob-cumsum)))
我包装了我所有的代码并且它有效。拆分大约需要 5 分钟,但我不需要超高速。内存使用量可以达到文件读取的所有内存,然后是 sigmoid。
如果您的文件按 ID 排序,您可以使用 partition-by
而不是 group-by
。
那么您的代码将如下所示:
(defn data-group->map [group]
[(:Id (first group))
{:x (mapv :Max group)
:y (mapv :Probability group)}])
(defn read-file []
(let [path (:path-file @config)
content-csv (take-csv path \,)]
(->> content-csv
(partition-by :Id)
(map data-group->map)
(into {}))))
这应该会加快速度。 然后你可以使用 transducer
让它更快(defn read-file []
(let [path (:path-file @config)
content-csv (take-csv path \,)]
(into {} (comp (partition-by :Id)
(map data-group->map))
content-csv)))
让我们做一些测试:
先生成像你这样的海量数据:
(def huge-data
(doall (mapcat #(repeat
1000000
{:Id % :Max 1 :Probability 10})
(range 10))))
我们有千万个项目数据集,有百万个 {:Id 0 :Max 1 :Probability 10}
、百万个 {:Id 1 :Max 1 :Probability 10}
等等。
现待测试功能:
(defn process-data-group-by [data]
(->> (group-by :Id data)
(map (fn [[k v]]
[k {:x (mapv :Max v) :y (mapv :Probability v)}]))
(into {})))
(defn process-data-partition-by [data]
(->> data
(partition-by :Id)
(map data-group->map)
(into {})))
(defn process-data-transducer [data]
(into {} (comp (partition-by :Id) (map data-group->map)) data))
现在进行时间测试:
(do (time (dorun (process-data-group-by huge-data)))
(time (dorun (process-data-partition-by huge-data)))
(time (dorun (process-data-transducer huge-data))))
"Elapsed time: 3377.167645 msecs"
"Elapsed time: 3707.03448 msecs"
"Elapsed time: 1462.955152 msecs"
注意,partition-by
产生的是lazy sequence,而group-by要实现whole collection。因此,如果您需要一组一组的数据,而不是整个地图,您可以删除 (into {})
并更快地访问每个数据:
(defn process-data-partition-by [data]
(->> data
(partition-by :Id)
(map data-group->map)))
检查:
user> (time (def processed-data (process-data-partition-by huge-data)))
"Elapsed time: 0.06079 msecs"
#'user/processed-data
user> (time (let [f (first processed-data)]))
"Elapsed time: 302.200571 msecs"
nil
user> (time (let [f (second processed-data)]))
"Elapsed time: 500.597153 msecs"
nil
user> (time (let [f (last processed-data)]))
"Elapsed time: 2924.588625 msecs"
nil
user.core> (time (let [f (last processed-data)]))
"Elapsed time: 0.037646 msecs"
nil