两个大文件彼此的平行余弦相似度
Parallel Cosine similarity of two large files with each other
我有两个文件:A 和 B
A has 400,000 lines each having 50 float values
B has 40,000 lines having 50 float values.
对于 B 中的每一行,我需要在 A 中找到具有 >90% 相似性(余弦)的对应行。
对于线性搜索和计算,代码需要大量的计算时间。 (40-50 小时)
向社区寻求有关如何加快流程的建议(link of blogs/resources,例如 AWS/Cloud 将用于实现它)。已经坚持了很长一段时间!
[有人提到 rpud/rpudplus 可以执行此操作,但似乎无法在云资源上执行]
N.B。根据要求,余弦相似度的代码是:
for line1, line2 in zip(f1, f2):
line1 = line1[:-1]
cnt = cnt + 1
l2X = [float(i) for i in line2.split()]
f3 = open(enLabelValues, 'r')
f4 = open(enVectorValues, 'r')
print cnt
cnt_j = 0
for line3, line4 in zip(f3, f4):
line3 = line3[:-1]
l4X = [float(i) for i in line4.split()]
########This is the line for spatial cosine similarity
result = 1 - spatial.distance.cosine(l2X, l4X)
cnt_j = cnt_j + 1
if(result > float(0.95)):
if line3 not in a.keys():
a[line3] = True
fLabel_2.write(line3+"\n")
fX_2.write(line4)
fLabel_2.flush()
fX_2.flush()
os.fsync(fLabel_2.fileno())
os.fsync(fX_2.fileno())
我可以生成 40,000 行和 400,000 行的合成文件,每行 50 个样本,并在合理的 4 核(+超线程)桌面上以大约 2 分 18 秒的时间处理它们 Mac 我笨拙的风格C++ 没有任何 SIMD 优化(由我)使用 GNU Parallel.
这是顶级脚本。您可以看到它在 "a.txt"
和 "b.txt"
中生成了测试数据。然后它 "compresses" "b.txt"
到一个相同的二进制表示,预先计算的幅度附加到每一行。最后,它将 "a.txt"
中的行编号并将它们传递给 GNU Parallel,后者将这些行分成大约 5,200 行的组,并启动一组 8 个并行进程来比较每个那些在 B.
中有 40,000 行的行
#!/bin/bash
# Generate test data - a.txt b.txt
./generate
# Preprocess b.txt into binary with precomputed magitudes save as B
./preprocess
# Process file A in batches
cat -n a.txt | parallel --block-size 2M --line-buffer --pipe ./process {#}
这里是 generate.cpp 合成数据的程序:
#include <iostream>
#include <cstdlib>
#include <fstream>
#include "common.h"
using namespace std;
int main()
{
int line,sample;
ofstream a("a.txt");
if (!a.is_open()){
cerr << "ERROR: Unable to open output file";
exit(EXIT_FAILURE);
}
for(line=0;line<ALINES;line++){
for(sample=0;sample<SAMPLESPERLINE;sample++){
a << (float)rand()*100/RAND_MAX << " ";
}
a << endl;
}
a.close();
ofstream b("b.txt");
if (!b.is_open()){
cerr << "ERROR: Unable to open output file";
exit(EXIT_FAILURE);
}
for(line=0;line<BLINES;line++){
for(sample=0;sample<SAMPLESPERLINE;sample++){
b << (float)rand()*100/RAND_MAX << " ";
}
b << endl;
}
b.close();
}
这里是 preprocess.cpp 代码:
#include <sstream>
#include <fstream>
#include <string>
#include <iostream>
#include <stdlib.h>
#include <vector>
#include <cmath>
#include "common.h"
int main(int argc, char* argv[]){
std::ifstream btxt("b.txt");
std::ofstream bbin("B",std::ios::out|std::ios::binary);
if (!btxt.is_open()){
std::cerr << "ERROR: Unable to open input file";
exit(EXIT_FAILURE);
}
if (!bbin.is_open()){
std::cerr << "ERROR: Unable to open output file";
exit(EXIT_FAILURE);
}
int l=0;
std::string line;
std::vector<float> v;
v.resize(SAMPLESPERLINE+1);
while (std::getline(btxt,line)){
std::istringstream iss(line);
v.clear();
float f;
double magnitude;
magnitude=0.0;
int s=0;
while (iss >> f){
v[s]=(f);
magnitude+=(double)f*f;
s++;
}
// Append the magnitude to the end of the "line"
v[s]=(float)sqrt(magnitude);
// Write the samples and magnitide in binary to the output file
bbin.write(reinterpret_cast<char*>(&v[0]),(SAMPLESPERLINE+1)*sizeof(float));
l++;
}
btxt.close();
bbin.close();
return EXIT_SUCCESS;
}
这是 common.h
文件:
const int ALINES=400000;
const int BLINES=40000;
const int SAMPLESPERLINE=50;
这里是 process.cpp
代码:
#include <sstream>
#include <fstream>
#include <string>
#include <iostream>
#include <stdlib.h>
#include <vector>
#include <array>
#include <cmath>
#include "common.h"
int main(int argc, char* argv[]){
if(argc!=2){
std::cerr << "Usage: process JOBNUM" << std::endl;
exit(1);
}
int JobNum=std::atoi(argv[1]);
std::cerr << "Starting job: " << JobNum << std::endl;
// Load B
std::ifstream bbin("B",std::ios::binary);
if (!bbin.is_open()){
std::cerr << "ERROR: Unable to open B";
exit(EXIT_FAILURE);
}
int l=0;
std::array<float,SAMPLESPERLINE+1> record;
std::vector<std::array<float,SAMPLESPERLINE+1>> B;
B.resize(BLINES);
for(l=0;l<BLINES;l++){
// Read one record of 50 floats and their magnitude
bbin.read(reinterpret_cast<char*>(&B[l][0]),sizeof(float)*(SAMPLESPERLINE+1));
}
bbin.close();
// Process all lines read from stdin, each line prepended by its line number
// Format is:
// <line number in file "a.txt"> <SAMPLE0> <SAMPLE1> ... <SAMPLE49>
int nLines=0;
std::string line;
while (std::getline(std::cin,line)){
nLines++;
std::istringstream iss(line);
std::vector<float> A;
A.resize(SAMPLESPERLINE);
float f;
int Alineno;
int s=0;
iss >> Alineno;
double dMag=0.0;
while (iss >> f){
A[s++]=f;
dMag+=(double)f*f;
}
// Root magnitude
float AMagnitude=(float)sqrt(dMag);
// At this point we have in B, 40,000 records each of 50 samples followed by the magnitude
// ... and we have a single record from "a.txt" with 50 samples and its magnitude in AMagnitude
// ... and Alineno is the absolute line number in "a.txt" of this line
// Time to do the actual calculation: compare this record to all records in B
for(int brec=0;brec<BLINES;brec++){
float BMagnitude=B[brec][SAMPLESPERLINE];
double dotproduct=0.0;
float *a = &A[0];
float *b = &B[brec][0];
for(s=0;s<SAMPLESPERLINE;s++){
dotproduct += (*a++) * (*b++);
}
float similarity = dotproduct/(AMagnitude*BMagnitude);
if(similarity>0.99){
std::cout << "Line A: " << Alineno << ", line B: " << brec << ", similarity:" << similarity << std::endl;
}
}
}
std::cerr << "Ending job: " << JobNum << ", processed " << nLines << " lines" << std::endl;
return EXIT_SUCCESS;
}
Makefile
非常简单:
CFLAGS= -std=c++11 -O3 -march=native
all: generate preprocess process
generate: generate.cpp
clang++ ${CFLAGS} generate.cpp -o generate
preprocess: preprocess.cpp
clang++ ${CFLAGS} preprocess.cpp -o preprocess
process: process.cpp
clang++ ${CFLAGS} process.cpp -o process
当你 运行 它时,它会固定 CPU 2 分钟,看起来像这样:
time ./go
Starting job: 3
Starting job: 7
Starting job: 8
Starting job: 2
Starting job: 5
Starting job: 1
Starting job: 4
Starting job: 6
Ending job: 1, processed 5204 lines
Starting job: 9
Ending job: 2, processed 5203 lines
Ending job: 3, processed 5204 lines
Starting job: 11
Starting job: 10
Ending job: 4, processed 5204 lines
Starting job: 12
Ending job: 5, processed 5203 lines
Ending job: 6, processed 5203 lines
Starting job: 14
Starting job: 13
...
...
Starting job: 75
Ending job: 68, processed 5204 lines
Ending job: 69, processed 5203 lines
Starting job: 76
Starting job: 77
Ending job: 70, processed 5203 lines
Ending job: 71, processed 5204 lines
Ending job: 72, processed 5203 lines
Ending job: 77, processed 4535 lines
Ending job: 74, processed 5204 lines
Ending job: 73, processed 5205 lines
Ending job: 75, processed 5204 lines
Ending job: 76, processed 5203 lines
real 2m17.510s
user 16m24.533s
sys 0m4.426s
请注意,我没有进行任何明确的 SIMD 或循环展开,也没有使用任何内在函数来形成点积。我怀疑如果你问了一个关于形成点积的问题并用 simd
或 avx
标记,有人会帮你优化它。
另请注意,您可以使用 GNU Parallel 轻松 运行 跨多台计算机使用此代码,假设您已经 ssh
登录到它们,只需使用:
parallel -S host1,host2,host3 ....
例如,我的网络上有一台 6 核 Debian PC,所以我 运行 上面的代码在我的 4 核 Mac 和 6 核 Debian 机器上并行化:
parallel -S :,debian ...
然后需要 1 分 8 秒。
我有两个文件:A 和 B
A has 400,000 lines each having 50 float values
B has 40,000 lines having 50 float values.
对于 B 中的每一行,我需要在 A 中找到具有 >90% 相似性(余弦)的对应行。
对于线性搜索和计算,代码需要大量的计算时间。 (40-50 小时)
向社区寻求有关如何加快流程的建议(link of blogs/resources,例如 AWS/Cloud 将用于实现它)。已经坚持了很长一段时间!
[有人提到 rpud/rpudplus 可以执行此操作,但似乎无法在云资源上执行]
N.B。根据要求,余弦相似度的代码是:
for line1, line2 in zip(f1, f2):
line1 = line1[:-1]
cnt = cnt + 1
l2X = [float(i) for i in line2.split()]
f3 = open(enLabelValues, 'r')
f4 = open(enVectorValues, 'r')
print cnt
cnt_j = 0
for line3, line4 in zip(f3, f4):
line3 = line3[:-1]
l4X = [float(i) for i in line4.split()]
########This is the line for spatial cosine similarity
result = 1 - spatial.distance.cosine(l2X, l4X)
cnt_j = cnt_j + 1
if(result > float(0.95)):
if line3 not in a.keys():
a[line3] = True
fLabel_2.write(line3+"\n")
fX_2.write(line4)
fLabel_2.flush()
fX_2.flush()
os.fsync(fLabel_2.fileno())
os.fsync(fX_2.fileno())
我可以生成 40,000 行和 400,000 行的合成文件,每行 50 个样本,并在合理的 4 核(+超线程)桌面上以大约 2 分 18 秒的时间处理它们 Mac 我笨拙的风格C++ 没有任何 SIMD 优化(由我)使用 GNU Parallel.
这是顶级脚本。您可以看到它在 "a.txt"
和 "b.txt"
中生成了测试数据。然后它 "compresses" "b.txt"
到一个相同的二进制表示,预先计算的幅度附加到每一行。最后,它将 "a.txt"
中的行编号并将它们传递给 GNU Parallel,后者将这些行分成大约 5,200 行的组,并启动一组 8 个并行进程来比较每个那些在 B.
#!/bin/bash
# Generate test data - a.txt b.txt
./generate
# Preprocess b.txt into binary with precomputed magitudes save as B
./preprocess
# Process file A in batches
cat -n a.txt | parallel --block-size 2M --line-buffer --pipe ./process {#}
这里是 generate.cpp 合成数据的程序:
#include <iostream>
#include <cstdlib>
#include <fstream>
#include "common.h"
using namespace std;
int main()
{
int line,sample;
ofstream a("a.txt");
if (!a.is_open()){
cerr << "ERROR: Unable to open output file";
exit(EXIT_FAILURE);
}
for(line=0;line<ALINES;line++){
for(sample=0;sample<SAMPLESPERLINE;sample++){
a << (float)rand()*100/RAND_MAX << " ";
}
a << endl;
}
a.close();
ofstream b("b.txt");
if (!b.is_open()){
cerr << "ERROR: Unable to open output file";
exit(EXIT_FAILURE);
}
for(line=0;line<BLINES;line++){
for(sample=0;sample<SAMPLESPERLINE;sample++){
b << (float)rand()*100/RAND_MAX << " ";
}
b << endl;
}
b.close();
}
这里是 preprocess.cpp 代码:
#include <sstream>
#include <fstream>
#include <string>
#include <iostream>
#include <stdlib.h>
#include <vector>
#include <cmath>
#include "common.h"
int main(int argc, char* argv[]){
std::ifstream btxt("b.txt");
std::ofstream bbin("B",std::ios::out|std::ios::binary);
if (!btxt.is_open()){
std::cerr << "ERROR: Unable to open input file";
exit(EXIT_FAILURE);
}
if (!bbin.is_open()){
std::cerr << "ERROR: Unable to open output file";
exit(EXIT_FAILURE);
}
int l=0;
std::string line;
std::vector<float> v;
v.resize(SAMPLESPERLINE+1);
while (std::getline(btxt,line)){
std::istringstream iss(line);
v.clear();
float f;
double magnitude;
magnitude=0.0;
int s=0;
while (iss >> f){
v[s]=(f);
magnitude+=(double)f*f;
s++;
}
// Append the magnitude to the end of the "line"
v[s]=(float)sqrt(magnitude);
// Write the samples and magnitide in binary to the output file
bbin.write(reinterpret_cast<char*>(&v[0]),(SAMPLESPERLINE+1)*sizeof(float));
l++;
}
btxt.close();
bbin.close();
return EXIT_SUCCESS;
}
这是 common.h
文件:
const int ALINES=400000;
const int BLINES=40000;
const int SAMPLESPERLINE=50;
这里是 process.cpp
代码:
#include <sstream>
#include <fstream>
#include <string>
#include <iostream>
#include <stdlib.h>
#include <vector>
#include <array>
#include <cmath>
#include "common.h"
int main(int argc, char* argv[]){
if(argc!=2){
std::cerr << "Usage: process JOBNUM" << std::endl;
exit(1);
}
int JobNum=std::atoi(argv[1]);
std::cerr << "Starting job: " << JobNum << std::endl;
// Load B
std::ifstream bbin("B",std::ios::binary);
if (!bbin.is_open()){
std::cerr << "ERROR: Unable to open B";
exit(EXIT_FAILURE);
}
int l=0;
std::array<float,SAMPLESPERLINE+1> record;
std::vector<std::array<float,SAMPLESPERLINE+1>> B;
B.resize(BLINES);
for(l=0;l<BLINES;l++){
// Read one record of 50 floats and their magnitude
bbin.read(reinterpret_cast<char*>(&B[l][0]),sizeof(float)*(SAMPLESPERLINE+1));
}
bbin.close();
// Process all lines read from stdin, each line prepended by its line number
// Format is:
// <line number in file "a.txt"> <SAMPLE0> <SAMPLE1> ... <SAMPLE49>
int nLines=0;
std::string line;
while (std::getline(std::cin,line)){
nLines++;
std::istringstream iss(line);
std::vector<float> A;
A.resize(SAMPLESPERLINE);
float f;
int Alineno;
int s=0;
iss >> Alineno;
double dMag=0.0;
while (iss >> f){
A[s++]=f;
dMag+=(double)f*f;
}
// Root magnitude
float AMagnitude=(float)sqrt(dMag);
// At this point we have in B, 40,000 records each of 50 samples followed by the magnitude
// ... and we have a single record from "a.txt" with 50 samples and its magnitude in AMagnitude
// ... and Alineno is the absolute line number in "a.txt" of this line
// Time to do the actual calculation: compare this record to all records in B
for(int brec=0;brec<BLINES;brec++){
float BMagnitude=B[brec][SAMPLESPERLINE];
double dotproduct=0.0;
float *a = &A[0];
float *b = &B[brec][0];
for(s=0;s<SAMPLESPERLINE;s++){
dotproduct += (*a++) * (*b++);
}
float similarity = dotproduct/(AMagnitude*BMagnitude);
if(similarity>0.99){
std::cout << "Line A: " << Alineno << ", line B: " << brec << ", similarity:" << similarity << std::endl;
}
}
}
std::cerr << "Ending job: " << JobNum << ", processed " << nLines << " lines" << std::endl;
return EXIT_SUCCESS;
}
Makefile
非常简单:
CFLAGS= -std=c++11 -O3 -march=native
all: generate preprocess process
generate: generate.cpp
clang++ ${CFLAGS} generate.cpp -o generate
preprocess: preprocess.cpp
clang++ ${CFLAGS} preprocess.cpp -o preprocess
process: process.cpp
clang++ ${CFLAGS} process.cpp -o process
当你 运行 它时,它会固定 CPU 2 分钟,看起来像这样:
time ./go
Starting job: 3
Starting job: 7
Starting job: 8
Starting job: 2
Starting job: 5
Starting job: 1
Starting job: 4
Starting job: 6
Ending job: 1, processed 5204 lines
Starting job: 9
Ending job: 2, processed 5203 lines
Ending job: 3, processed 5204 lines
Starting job: 11
Starting job: 10
Ending job: 4, processed 5204 lines
Starting job: 12
Ending job: 5, processed 5203 lines
Ending job: 6, processed 5203 lines
Starting job: 14
Starting job: 13
...
...
Starting job: 75
Ending job: 68, processed 5204 lines
Ending job: 69, processed 5203 lines
Starting job: 76
Starting job: 77
Ending job: 70, processed 5203 lines
Ending job: 71, processed 5204 lines
Ending job: 72, processed 5203 lines
Ending job: 77, processed 4535 lines
Ending job: 74, processed 5204 lines
Ending job: 73, processed 5205 lines
Ending job: 75, processed 5204 lines
Ending job: 76, processed 5203 lines
real 2m17.510s
user 16m24.533s
sys 0m4.426s
请注意,我没有进行任何明确的 SIMD 或循环展开,也没有使用任何内在函数来形成点积。我怀疑如果你问了一个关于形成点积的问题并用 simd
或 avx
标记,有人会帮你优化它。
另请注意,您可以使用 GNU Parallel 轻松 运行 跨多台计算机使用此代码,假设您已经 ssh
登录到它们,只需使用:
parallel -S host1,host2,host3 ....
例如,我的网络上有一台 6 核 Debian PC,所以我 运行 上面的代码在我的 4 核 Mac 和 6 核 Debian 机器上并行化:
parallel -S :,debian ...
然后需要 1 分 8 秒。