我如何将文本文件分解成更小的块(在 Unix 上使用 C++)?
How could I decompose a text file in smaller chunks (using C++ on Unix)?
我正在做一项学校作业,我必须使顺序代码并发。代码所做的称为 MapReduce,更具体地说,它计算所有单词在所有输入文件中出现的次数。
- 输入:包含所有要处理的文本文件的目录。
- 输出:包含带有结果的文本文件的目录。
我们给出的实现由 4 个阶段组成;拆分地图洗牌和减少。前 3 个必须在它们之间同时发生,当所有 3 个都完成时,Reduce 就会发生。
并发,除了发生在这 3 个阶段之间,还必须发生在每个阶段内。为了做到这一点,我们被告知在一定数量的线程之间划分工作(每个阶段都有它的线程)。
From now on I am going to talk only about the Split and Map stage, which are
the ones I am having trouble with right now.
Split 和 Map 阶段将各自有一个线程 'partition'(它们将具有相同数量的线程,因为每个 Split 线程将关联一个 Map 线程),并且将有一个 'partition' 对于输入目录中的每个文件,除了大文件,我必须每 8 MB 生成一个 'partition'。
This exception is what I am trying to solve, let me explain one more thing and I'll get to it.
由于 Split 和 Map 必须具有相同数量的线程,我所做的是创建一个函数来计算必须创建的分区数,以便能够告诉 Split 和 Map 有多少当我启动它们时,它们必须创建线程吗?
Split 线程所做的是获取已分配给它们的 'partition'(如果其大小小于 8 MB,它将是一个完整的文件)并将其逐行发送到其关联的 Map 线程(通过将它们写入线程安全队列)。
好的,这就是我的问题,我想让 Map 线程(从提到的队列中读取并处理每一行,现在无关紧要)从队列中读取直到它们读取 EOF,这意味着它的相关拆分线程已结束将其 'partition' 写入队列,但这仅适用于代表整个文件的 'partitions'。
所以,我的问题是:
- 我必须使用哪些选项才能将文件分解为 8 MB 的块?
- 我如何才能让线程知道何时停止读取?
- 由于有一小部分时间 Map 线程将尝试从队列中读取,但 Split 线程还没有写入任何内容,我如何才能让它们 'wait' 成为某些东西写入队列?
这是计算需要多少线程的函数,我想让它生成一个文件描述符数组,每个文件描述符包含一个 'partition'
int MapReduce::getSplitNum(){
int split_num = 0;
char file_path[264];
DIR* dir = opendir(InputPath);
struct dirent* entity;
unsigned char isFile =0x8; // El valor que pren entity->d_type quan es tracta d'un fitxer
while ((entity = readdir(dir)) != NULL)
{
// Evitem el directori que conté els fitxers (.) i el directori anterior (..).
if( strcmp(entity->d_name, ".")!=0 && strcmp(entity->d_name, "..")!=0 && entity->d_type == isFile )
{
struct stat file_status;
sprintf(file_path,"%s/%s",InputPath, entity->d_name);
stat(file_path, &file_status);
long file_size = file_status.st_size;
// DEBUG: printf("Fitxer: %s\t Mida: %ld\n", entity->d_name, file_status.st_size);
if (file_size < MAX_SPLIT_SIZE)
{
split_num++;
}
else
{
long restant = file_size - MAX_SPLIT_SIZE;
split_num = split_num + 2; // Sumem 2 perquè al ser un arxiu gran com a mínim usarà 2 splits, més els que vagi afegint el bucle while.
while (restant > MAX_SPLIT_SIZE)
{
restant = restant - MAX_SPLIT_SIZE;
split_num++;
}
}
}
}
closedir(dir);
return split_num;
}
我想分解文件的方法有很多种,但我感兴趣的是 'good practice' 哪一种。
谢谢!
也许是这样的?这个函数将一个文件分解成几等份,最后一个是不同的(文件的其余部分)。这可以修改为将文件分解为特定大小的块。我刚刚写了这个,它似乎对我有用,但当然还需要进一步测试。当然,我不确定这是最佳解决方案。这会产生 n 个文件,名称为 0, 1, ..., n.
#include <iostream>
#include <fstream>
void decompose_file (const std::string& filepath, const int number_of_files) {
std::ifstream infile(filepath);
infile.seekg(0, std::ios::end);
size_t length = infile.tellg();
infile.seekg(0, std::ios::beg);
for (int i = 0; i < number_of_files; ++i) {
char * buffer;
size_t chunk_size = 0;
if (i != number_of_files - 1) {
chunk_size = length / number_of_files;
}
else {
chunk_size = length - ((number_of_files - 1) * (length / number_of_files));
}
buffer = new char[chunk_size];
infile.read (buffer, chunk_size);
std::ofstream outfile (std::to_string(i));
if (outfile.is_open()) {
outfile.write(buffer, chunk_size);
outfile.close();
}
delete[] buffer;
}
infile.close();
}
int main (int argc, char* argv[]) {
decompose_file("my_file.txt", 4);
return 0;
}
我正在做一项学校作业,我必须使顺序代码并发。代码所做的称为 MapReduce,更具体地说,它计算所有单词在所有输入文件中出现的次数。
- 输入:包含所有要处理的文本文件的目录。
- 输出:包含带有结果的文本文件的目录。
我们给出的实现由 4 个阶段组成;拆分地图洗牌和减少。前 3 个必须在它们之间同时发生,当所有 3 个都完成时,Reduce 就会发生。
并发,除了发生在这 3 个阶段之间,还必须发生在每个阶段内。为了做到这一点,我们被告知在一定数量的线程之间划分工作(每个阶段都有它的线程)。
From now on I am going to talk only about the Split and Map stage, which are the ones I am having trouble with right now.
Split 和 Map 阶段将各自有一个线程 'partition'(它们将具有相同数量的线程,因为每个 Split 线程将关联一个 Map 线程),并且将有一个 'partition' 对于输入目录中的每个文件,除了大文件,我必须每 8 MB 生成一个 'partition'。
This exception is what I am trying to solve, let me explain one more thing and I'll get to it.
由于 Split 和 Map 必须具有相同数量的线程,我所做的是创建一个函数来计算必须创建的分区数,以便能够告诉 Split 和 Map 有多少当我启动它们时,它们必须创建线程吗?
Split 线程所做的是获取已分配给它们的 'partition'(如果其大小小于 8 MB,它将是一个完整的文件)并将其逐行发送到其关联的 Map 线程(通过将它们写入线程安全队列)。
好的,这就是我的问题,我想让 Map 线程(从提到的队列中读取并处理每一行,现在无关紧要)从队列中读取直到它们读取 EOF,这意味着它的相关拆分线程已结束将其 'partition' 写入队列,但这仅适用于代表整个文件的 'partitions'。
所以,我的问题是:
- 我必须使用哪些选项才能将文件分解为 8 MB 的块?
- 我如何才能让线程知道何时停止读取?
- 由于有一小部分时间 Map 线程将尝试从队列中读取,但 Split 线程还没有写入任何内容,我如何才能让它们 'wait' 成为某些东西写入队列?
这是计算需要多少线程的函数,我想让它生成一个文件描述符数组,每个文件描述符包含一个 'partition'
int MapReduce::getSplitNum(){
int split_num = 0;
char file_path[264];
DIR* dir = opendir(InputPath);
struct dirent* entity;
unsigned char isFile =0x8; // El valor que pren entity->d_type quan es tracta d'un fitxer
while ((entity = readdir(dir)) != NULL)
{
// Evitem el directori que conté els fitxers (.) i el directori anterior (..).
if( strcmp(entity->d_name, ".")!=0 && strcmp(entity->d_name, "..")!=0 && entity->d_type == isFile )
{
struct stat file_status;
sprintf(file_path,"%s/%s",InputPath, entity->d_name);
stat(file_path, &file_status);
long file_size = file_status.st_size;
// DEBUG: printf("Fitxer: %s\t Mida: %ld\n", entity->d_name, file_status.st_size);
if (file_size < MAX_SPLIT_SIZE)
{
split_num++;
}
else
{
long restant = file_size - MAX_SPLIT_SIZE;
split_num = split_num + 2; // Sumem 2 perquè al ser un arxiu gran com a mínim usarà 2 splits, més els que vagi afegint el bucle while.
while (restant > MAX_SPLIT_SIZE)
{
restant = restant - MAX_SPLIT_SIZE;
split_num++;
}
}
}
}
closedir(dir);
return split_num;
}
我想分解文件的方法有很多种,但我感兴趣的是 'good practice' 哪一种。
谢谢!
也许是这样的?这个函数将一个文件分解成几等份,最后一个是不同的(文件的其余部分)。这可以修改为将文件分解为特定大小的块。我刚刚写了这个,它似乎对我有用,但当然还需要进一步测试。当然,我不确定这是最佳解决方案。这会产生 n 个文件,名称为 0, 1, ..., n.
#include <iostream>
#include <fstream>
void decompose_file (const std::string& filepath, const int number_of_files) {
std::ifstream infile(filepath);
infile.seekg(0, std::ios::end);
size_t length = infile.tellg();
infile.seekg(0, std::ios::beg);
for (int i = 0; i < number_of_files; ++i) {
char * buffer;
size_t chunk_size = 0;
if (i != number_of_files - 1) {
chunk_size = length / number_of_files;
}
else {
chunk_size = length - ((number_of_files - 1) * (length / number_of_files));
}
buffer = new char[chunk_size];
infile.read (buffer, chunk_size);
std::ofstream outfile (std::to_string(i));
if (outfile.is_open()) {
outfile.write(buffer, chunk_size);
outfile.close();
}
delete[] buffer;
}
infile.close();
}
int main (int argc, char* argv[]) {
decompose_file("my_file.txt", 4);
return 0;
}