在输出文件名中添加时间戳
Add timestamp in outputfile name
我们有一个很长的 运行 管道,我们希望将时间戳添加到文件名中,尽可能接近管道结束的时间。
我们想出的解决方案是使用 FilenamePolicy
,这似乎工作正常,但它总是打印一条警告消息抱怨删除临时文件,但文件已成功删除。我们使用的是版本 2.31.0
{"@timestamp":"2022-04-26T16:39:39.182-04:00","@version":"1","message":"Failed to match temporary files under: [C:\Users\userId\Desktop\tbd\output\.temp-beam-1deb0fa8-96fe-4989-b5aa-ea508906cb96\].","logger_name":"org.apache.beam.sdk.io.FileBasedSink","thread_name":"direct-runner-worker","severity":"WARN","level_value":30000}
这是代码片段:
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
String now = DATE_TIME_FORMATTER.format(
LocalDateTime.now());
System.out.println("current time" + now);
Write writer = TextIO.write().withNumShards(1)
.withTempDirectory(
FileSystems.matchNewResource("C:\Users\userId\Desktop\tbd\output", true))
.to(new FilenamePolicy() {
@Override
public ResourceId windowedFilename(int shardNumber, int numShards, BoundedWindow window,
PaneInfo paneInfo, OutputFileHints outputFileHints) {
throw new RuntimeException("not implemented");
}
@Override
public @Nullable ResourceId unwindowedFilename(int shardNumber, int numShards,
OutputFileHints outputFileHints) {
String time = DATE_TIME_FORMATTER.format(
LocalDateTime.now());
String filename =
String.format(
"%s-%s-of-%s%s",
"C:\Users\userId\Desktop\tbd\output\file-" + time,
shardNumber,
numShards,
outputFileHints.getSuggestedFilenameSuffix());
return FileSystems.matchNewResource(
filename, false);
}
});
pipeline.apply(Create.of("test", "test2", "test3")).apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void process(ProcessContext c) {
try {
Thread.sleep(120000);
c.output(c.element());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
})).apply(writer);
pipeline.run();
是因为WindowsOS无法解析匹配路径中的“*”,异常被忽略,出现这样的警告信息。见 comment.
我们有一个很长的 运行 管道,我们希望将时间戳添加到文件名中,尽可能接近管道结束的时间。
我们想出的解决方案是使用 FilenamePolicy
,这似乎工作正常,但它总是打印一条警告消息抱怨删除临时文件,但文件已成功删除。我们使用的是版本 2.31.0
{"@timestamp":"2022-04-26T16:39:39.182-04:00","@version":"1","message":"Failed to match temporary files under: [C:\Users\userId\Desktop\tbd\output\.temp-beam-1deb0fa8-96fe-4989-b5aa-ea508906cb96\].","logger_name":"org.apache.beam.sdk.io.FileBasedSink","thread_name":"direct-runner-worker","severity":"WARN","level_value":30000}
这是代码片段:
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
String now = DATE_TIME_FORMATTER.format(
LocalDateTime.now());
System.out.println("current time" + now);
Write writer = TextIO.write().withNumShards(1)
.withTempDirectory(
FileSystems.matchNewResource("C:\Users\userId\Desktop\tbd\output", true))
.to(new FilenamePolicy() {
@Override
public ResourceId windowedFilename(int shardNumber, int numShards, BoundedWindow window,
PaneInfo paneInfo, OutputFileHints outputFileHints) {
throw new RuntimeException("not implemented");
}
@Override
public @Nullable ResourceId unwindowedFilename(int shardNumber, int numShards,
OutputFileHints outputFileHints) {
String time = DATE_TIME_FORMATTER.format(
LocalDateTime.now());
String filename =
String.format(
"%s-%s-of-%s%s",
"C:\Users\userId\Desktop\tbd\output\file-" + time,
shardNumber,
numShards,
outputFileHints.getSuggestedFilenameSuffix());
return FileSystems.matchNewResource(
filename, false);
}
});
pipeline.apply(Create.of("test", "test2", "test3")).apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void process(ProcessContext c) {
try {
Thread.sleep(120000);
c.output(c.element());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
})).apply(writer);
pipeline.run();
是因为WindowsOS无法解析匹配路径中的“*”,异常被忽略,出现这样的警告信息。见 comment.