使用目录扫描运算符处理具有特定扩展名的大文件
Working with large files with a particular extension using directory scan operator
我有一个 1GB+ 大小的文件从 MQ 进入我的目录,这需要一些时间才能完全传输该文件,但是即使文件不是完整的,也会在该目录中生成一个文件。
恐怕我的 directoryScan 操作员会选择一个不完整的文件。
另外,我无法添加初始延迟,因为我不确定传输文件需要多少时间。
PS:我在某处读到一些文件传输协议通过向文件添加不同的扩展名来解决这个问题,直到它完成。所以说我的 directoryScan 操作员正在等待任何扩展名为 .txt 的文件,因此此文件传输协议将创建一个扩展名为 .abc 的文件,直到传输完成。
我应该如何进行呢?
如果您打算使用正则表达式路由,这里是调用运算符仅读取特定扩展名的文件的示例:
// DirectoryScan operator with an absolute file argument and a file name pattern
stream<rstring name> Dir2 = DirectoryScan()
{
param
directory : "/tmp/work";
pattern : "\.txt$";
}
如果这不起作用,是否可以设置 MQ 将文件写入不同的目录,然后在完成后将其移动到您的目标目录?
如果您知道文件的大小,您可以做的一件事是使用 Size()
函数忽略该文件,直到它达到正确的大小。此代码段使用 Custom
运算符等待文件大小至少为 2000 字节。
graph
stream <rstring filename, uint64 size> DirScanOutput = DirectoryScan() {
param
directory: "test1";
sleepTime: 10.0; //wait 10s between scans
pattern: ".*\.txt";
output DirScanOutput : size= Size();
}
stream<rstring file> FileNameStream= Custom(DirScanOutput as In){
logic
onTuple In:{
if (size < 2000ul){
printStringLn("Required size not met yet.");
} else {
printStringLn("Size of file reached.");
submit({file=filename}, FileNameStream);
}
}
}
stream <cityData> CityDataRecord = FileSource(FileNameStream) {
param
format: csv;
}
我希望这些建议中的一个对你有用。
我有一个 1GB+ 大小的文件从 MQ 进入我的目录,这需要一些时间才能完全传输该文件,但是即使文件不是完整的,也会在该目录中生成一个文件。 恐怕我的 directoryScan 操作员会选择一个不完整的文件。 另外,我无法添加初始延迟,因为我不确定传输文件需要多少时间。
PS:我在某处读到一些文件传输协议通过向文件添加不同的扩展名来解决这个问题,直到它完成。所以说我的 directoryScan 操作员正在等待任何扩展名为 .txt 的文件,因此此文件传输协议将创建一个扩展名为 .abc 的文件,直到传输完成。
我应该如何进行呢?
如果您打算使用正则表达式路由,这里是调用运算符仅读取特定扩展名的文件的示例:
// DirectoryScan operator with an absolute file argument and a file name pattern
stream<rstring name> Dir2 = DirectoryScan()
{
param
directory : "/tmp/work";
pattern : "\.txt$";
}
如果这不起作用,是否可以设置 MQ 将文件写入不同的目录,然后在完成后将其移动到您的目标目录?
如果您知道文件的大小,您可以做的一件事是使用 Size()
函数忽略该文件,直到它达到正确的大小。此代码段使用 Custom
运算符等待文件大小至少为 2000 字节。
graph
stream <rstring filename, uint64 size> DirScanOutput = DirectoryScan() {
param
directory: "test1";
sleepTime: 10.0; //wait 10s between scans
pattern: ".*\.txt";
output DirScanOutput : size= Size();
}
stream<rstring file> FileNameStream= Custom(DirScanOutput as In){
logic
onTuple In:{
if (size < 2000ul){
printStringLn("Required size not met yet.");
} else {
printStringLn("Size of file reached.");
submit({file=filename}, FileNameStream);
}
}
}
stream <cityData> CityDataRecord = FileSource(FileNameStream) {
param
format: csv;
}
我希望这些建议中的一个对你有用。