根据日志时间将日志文件从 pubsub 日志写入具有自定义目录名和文件名的存储桶

Wtiting log files from pubsub logs to bucket with custom directory name and file name based out of log time

我正在解析来自 pubsub 的日志,目的是将这些日志放在自定义位置的每小时文件中,该位置再次基于日志时间戳(pubsub 日志中的字段)。

文件应获取特定时间的所有数据。文件应按小时继续追加。 例如gs://bucket/applog/2017-09-27/application1/app-2017-09-27-11H.log

pushFilePColl.apply(Window.into(new FileTextIOWindowFn())) .apply("FileTO to LOG TextIO", ParDo.of(new TextIOWriteDoFn())) .apply(TextIO.write().to(pipelineOptions.getFileStorage‌​Bucket()).withWindow‌​edWrites() .withFilenamePolicy(new FileStorageFileNamePolicy(logTypeEnum)).withNumShards(10));

自定义 window :

public class FileTextIOWindowFn extends NonMergingWindowFn<Object, IntervalWindow> {

/**
 * 
 */
private static final long serialVersionUID = 1L;

private IntervalWindow assignWindow(AssignContext context) {
    FilePushTO filePushTO = (FilePushTO) context.element();
    String timestamp = filePushTO.getLogTime();
    DateTimeFormatter formatter = DateTimeFormat.forPattern(CommonConstants.DATE_FORMAT_YYYYMMDD_HHMMSS_SSS)
            .withZoneUTC();
    Instant start_point = Instant.parse(timestamp, formatter);
    Calendar cal = DateUtil.getCurrentDateInUTC();
    SimpleDateFormat DATE_FORMATER_PARTITION_NAME = DateUtil.getDateFormater();
    Instant end_point = Instant.parse(DATE_FORMATER_PARTITION_NAME.format(cal.getTime()), formatter);
    return new IntervalWindow(start_point, end_point);
};

@Override
public Coder<IntervalWindow> windowCoder() {
    return IntervalWindow.getCoder();
}

@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
    return Arrays.asList(assignWindow(c));
}

@Override
public boolean isCompatible(WindowFn<?, ?> other) {
    return false;
}

@Override
public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
    throw new IllegalArgumentException(
            "Attempted to get side input window for GlobalWindow from non-global WindowFn");
}

}

文件名策略:

public class FileStorageFileNamePolicy extends FileBasedSink.FilenamePolicy {
/**
 * 
 */
private static final long serialVersionUID = 1L;

private static Logger LOGGER = LoggerFactory.getLogger(FileStorageFileNamePolicy.class);

private LogTypeEnum logTypeEnum;

public FileStorageFileNamePolicy(LogTypeEnum logTypeEnum) {
    this.logTypeEnum = logTypeEnum;
}

@Override
public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext context, String extension) {
    IntervalWindow window = (IntervalWindow) context.getWindow();
    String startDate = window.start().toString();
    String dateString = startDate.replace("T", CommonConstants.SPACE)
            .replaceAll(startDate.substring(startDate.indexOf("Z")), CommonConstants.EMPTY_STRING);
    String startDateHour = startDate;
    try {
        startDate = DateUtil.getDateForFileStore(dateString, null);
        startDateHour = DateUtil.getDTLocalTZHour(dateString, null);
    } catch (ParseException e) {
        LOGGER.error("Error converting date  : {}", e);
    }
    String filename = new StringBuilder(window.start().toString()).append(CommonConstants.COLON)
            .append(startDateHour).append(CommonConstants.UNDER_SCORE).append(context.getShardNumber())
            .append(".txt").toString();
    String dirName = new StringBuilder(startDate).append(CommonConstants.FORWARD_SLASH)
            .append(logTypeEnum.getValue().toLowerCase()).append(CommonConstants.FORWARD_SLASH).toString();
    LOGGER.info("Directory : {} and File Name : {}", dirName, filename);
    return outputDirectory.resolve(dirName, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY)
            .resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
}

@Override
public ResourceId unwindowedFilename(ResourceId outputDirectory, Context context, String extension) {
    throw new UnsupportedOperationException("Unsupported.");
}

}

我用 Interval window 制作了 customWindow,这样我就可以在 FileNamePolicy 中获得适当的时间戳。我不能使用 fixedWindow,因为它总是给我当前时间戳。

此处一切正常,但无法附加文件。它们被覆盖了。

您可以使用 Beam 2.1 中提供的 TextIO.write().to(...).withWindowedWrites() 来执行此操作。参见 TextIO javadoc