在输出文件名中添加时间戳

Add timestamp in outputfile name

我们有一个很长的 运行 管道,我们希望将时间戳添加到文件名中,尽可能接近管道结束的时间。 我们想出的解决方案是使用 FilenamePolicy,这似乎工作正常,但它总是打印一条警告消息抱怨删除临时文件,但文件已成功删除。我们使用的是版本 2.31.0

{"@timestamp":"2022-04-26T16:39:39.182-04:00","@version":"1","message":"Failed to match temporary files under: [C:\Users\userId\Desktop\tbd\output\.temp-beam-1deb0fa8-96fe-4989-b5aa-ea508906cb96\].","logger_name":"org.apache.beam.sdk.io.FileBasedSink","thread_name":"direct-runner-worker","severity":"WARN","level_value":30000}

这是代码片段:

PipelineOptions options = PipelineOptionsFactory.create();
    Pipeline pipeline = Pipeline.create(options);
    String now = DATE_TIME_FORMATTER.format(
        LocalDateTime.now());
    System.out.println("current time" + now);
    Write writer = TextIO.write().withNumShards(1)
        .withTempDirectory(
            FileSystems.matchNewResource("C:\Users\userId\Desktop\tbd\output", true))
        .to(new FilenamePolicy() {
          @Override
          public ResourceId windowedFilename(int shardNumber, int numShards, BoundedWindow window,
              PaneInfo paneInfo, OutputFileHints outputFileHints) {
            throw new RuntimeException("not implemented");
          }

          @Override
          public @Nullable ResourceId unwindowedFilename(int shardNumber, int numShards,
              OutputFileHints outputFileHints) {
            String time = DATE_TIME_FORMATTER.format(
                LocalDateTime.now());
            String filename =
                String.format(
                    "%s-%s-of-%s%s",
                    "C:\Users\userId\Desktop\tbd\output\file-" + time,
                    shardNumber,
                    numShards,
                    outputFileHints.getSuggestedFilenameSuffix());
            return FileSystems.matchNewResource(
                filename, false);
          }
          
        });
    pipeline.apply(Create.of("test", "test2", "test3")).apply(ParDo.of(new DoFn<String, String>() {
      @ProcessElement
      public void process(ProcessContext c) {
        try {
          Thread.sleep(120000);
          c.output(c.element());
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    })).apply(writer);
    pipeline.run();

是因为WindowsOS无法解析匹配路径中的“*”,异常被忽略,出现这样的警告信息。见 comment.