如何在 Apache Beam 2.6 中使用 FileIO.writeDynamic() 写入多个输出路径?

How to use FileIO.writeDynamic() in Apache Beam 2.6 to write to multiple output paths?

我正在使用 Apache Beam 2.6 读取单个 Kafka 主题并将输出写入 Google 云存储 (GCS)。现在我想改变管道,让它读取多个主题并将它们写成 gs://bucket/topic/...

当我只阅读一个主题时,我在管道的最后一步使用了 TextIO

TextIO.write()
    .to(
        new DateNamedFiles(
            String.format("gs://bucket/data%s/", suffix), currentMillisString))
    .withWindowedWrites()
    .withTempDirectory(
        FileBasedSink.convertToFileResourceIfPossible(
            String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString)))
    .withNumShards(1));

是一个类似的问题,我试图适应其中的代码。

FileIO.<EventType, Event>writeDynamic()
    .by(
        new SerializableFunction<Event, EventType>() {
          @Override
          public EventType apply(Event input) {
            return EventType.TRANSFER; // should return real type here, just a dummy
          }
        })
    .via(
        Contextful.fn(
            new SerializableFunction<Event, String>() {
              @Override
              public String apply(Event input) {
                return "Dummy"; // should return the Event converted to a String
              }
            }),
        TextIO.sink())
    .to(DynamicFileDestinations.constant(new DateNamedFiles("gs://bucket/tmp%s/%s/",
                                                            currentMillisString),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String input) {
            return null; // Not sure what this should exactly, but it needs to 
                         // include the EventType into the path
          }
        }))
    .withTempDirectory(
        FileBasedSink.convertToFileResourceIfPossible(
            String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString)))
    .withNumShards(1))

official JavaDoc contains example code which seem to have outdated method signatures. (The .via method seems to have switched the order of the arguments). I' furthermore stumbled across the example in FileIO which confused me - shouldn't TransactionType and Transaction in this line换地方?

经过一夜的睡眠和重新开始后,我找到了解决方案,我使用了函数式 Java 8 样式,因为它使代码更短(并且更具可读性):

  .apply(
    FileIO.<String, Event>writeDynamic()
        .by((SerializableFunction<Event, String>) input -> input.getTopic())
        .via(
            Contextful.fn(
                (SerializableFunction<Event, String>) input -> input.getPayload()),
            TextIO.sink())
        .to(String.format("gs://bucket/data%s/", suffix)
        .withNaming(type -> FileNaming.getNaming(type, "", currentMillisString))
        .withDestinationCoder(StringUtf8Coder.of())
        .withTempDirectory(
            String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString))
        .withNumShards(1));

解释:

  • Event是一个JavaPOJO,包含Kafka消息的payload及其所属的topic,在KafkaIO步骤后的ParDo中解析
  • suffixdev 或空的,由环境变量
  • 设置
  • currentMillisString包含整个流水线运行时的时间戳 已启动,以便新文件不会覆盖 GCS 上的旧文件 管道重新启动
  • FileNaming 实现自定义命名并在其构造函数中接收事件(主题)的类型,它使用自定义格式化程序写入日常分区 "sub-folders"地面站:

    class FileNaming implements FileIO.Write.FileNaming {
      static FileNaming getNaming(String topic, String suffix, String currentMillisString) {
        return new FileNaming(topic, suffix, currentMillisString);
      }
    
      private static final DateTimeFormatter FORMATTER = DateTimeFormat
          .forPattern("yyyy-MM-dd").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("Europe/Zurich")));
    
      private final String topic;
      private final String suffix;
      private final String currentMillisString;
    
      private String filenamePrefixForWindow(IntervalWindow window) {
        return String.format(
            "%s/%s/%s_", topic, FORMATTER.print(window.start()), currentMillisString);
      }
    
      private FileNaming(String topic, String suffix, String currentMillisString) {
        this.topic = topic;
        this.suffix = suffix;
        this.currentMillisString = currentMillisString;
      }
    
      @Override
      public String getFilename(
          BoundedWindow window,
          PaneInfo pane,
          int numShards,
          int shardIndex,
          Compression compression) {
    
        IntervalWindow intervalWindow = (IntervalWindow) window;
        String filenamePrefix = filenamePrefixForWindow(intervalWindow);
        String filename =
            String.format(
                "pane-%d-%s-%05d-of-%05d%s",
                pane.getIndex(),
                pane.getTiming().toString().toLowerCase(),
                shardIndex,
                numShards,
                suffix);
        String fullName = filenamePrefix + filename;
        return fullName;
      }
    }