根据日志时间将日志文件从 pubsub 日志写入具有自定义目录名和文件名的存储桶
Wtiting log files from pubsub logs to bucket with custom directory name and file name based out of log time
我正在解析来自 pubsub 的日志,目的是将这些日志放在自定义位置的每小时文件中,该位置再次基于日志时间戳(pubsub 日志中的字段)。
文件应获取特定时间的所有数据。文件应按小时继续追加。
例如gs://bucket/applog/2017-09-27/application1/app-2017-09-27-11H.log
pushFilePColl.apply(Window.into(new FileTextIOWindowFn())) .apply("FileTO to LOG TextIO", ParDo.of(new TextIOWriteDoFn())) .apply(TextIO.write().to(pipelineOptions.getFileStorageBucket()).withWindowedWrites() .withFilenamePolicy(new FileStorageFileNamePolicy(logTypeEnum)).withNumShards(10));
自定义 window :
public class FileTextIOWindowFn extends NonMergingWindowFn<Object, IntervalWindow> {
/**
*
*/
private static final long serialVersionUID = 1L;
private IntervalWindow assignWindow(AssignContext context) {
FilePushTO filePushTO = (FilePushTO) context.element();
String timestamp = filePushTO.getLogTime();
DateTimeFormatter formatter = DateTimeFormat.forPattern(CommonConstants.DATE_FORMAT_YYYYMMDD_HHMMSS_SSS)
.withZoneUTC();
Instant start_point = Instant.parse(timestamp, formatter);
Calendar cal = DateUtil.getCurrentDateInUTC();
SimpleDateFormat DATE_FORMATER_PARTITION_NAME = DateUtil.getDateFormater();
Instant end_point = Instant.parse(DATE_FORMATER_PARTITION_NAME.format(cal.getTime()), formatter);
return new IntervalWindow(start_point, end_point);
};
@Override
public Coder<IntervalWindow> windowCoder() {
return IntervalWindow.getCoder();
}
@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
return Arrays.asList(assignWindow(c));
}
@Override
public boolean isCompatible(WindowFn<?, ?> other) {
return false;
}
@Override
public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
throw new IllegalArgumentException(
"Attempted to get side input window for GlobalWindow from non-global WindowFn");
}
}
文件名策略:
public class FileStorageFileNamePolicy extends FileBasedSink.FilenamePolicy {
/**
*
*/
private static final long serialVersionUID = 1L;
private static Logger LOGGER = LoggerFactory.getLogger(FileStorageFileNamePolicy.class);
private LogTypeEnum logTypeEnum;
public FileStorageFileNamePolicy(LogTypeEnum logTypeEnum) {
this.logTypeEnum = logTypeEnum;
}
@Override
public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext context, String extension) {
IntervalWindow window = (IntervalWindow) context.getWindow();
String startDate = window.start().toString();
String dateString = startDate.replace("T", CommonConstants.SPACE)
.replaceAll(startDate.substring(startDate.indexOf("Z")), CommonConstants.EMPTY_STRING);
String startDateHour = startDate;
try {
startDate = DateUtil.getDateForFileStore(dateString, null);
startDateHour = DateUtil.getDTLocalTZHour(dateString, null);
} catch (ParseException e) {
LOGGER.error("Error converting date : {}", e);
}
String filename = new StringBuilder(window.start().toString()).append(CommonConstants.COLON)
.append(startDateHour).append(CommonConstants.UNDER_SCORE).append(context.getShardNumber())
.append(".txt").toString();
String dirName = new StringBuilder(startDate).append(CommonConstants.FORWARD_SLASH)
.append(logTypeEnum.getValue().toLowerCase()).append(CommonConstants.FORWARD_SLASH).toString();
LOGGER.info("Directory : {} and File Name : {}", dirName, filename);
return outputDirectory.resolve(dirName, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY)
.resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
}
@Override
public ResourceId unwindowedFilename(ResourceId outputDirectory, Context context, String extension) {
throw new UnsupportedOperationException("Unsupported.");
}
}
我用 Interval window 制作了 customWindow,这样我就可以在 FileNamePolicy 中获得适当的时间戳。我不能使用 fixedWindow,因为它总是给我当前时间戳。
此处一切正常,但无法附加文件。它们被覆盖了。
您可以使用 Beam 2.1 中提供的 TextIO.write().to(...).withWindowedWrites()
来执行此操作。参见 TextIO javadoc。
我正在解析来自 pubsub 的日志,目的是将这些日志放在自定义位置的每小时文件中,该位置再次基于日志时间戳(pubsub 日志中的字段)。
文件应获取特定时间的所有数据。文件应按小时继续追加。 例如gs://bucket/applog/2017-09-27/application1/app-2017-09-27-11H.log
pushFilePColl.apply(Window.into(new FileTextIOWindowFn())) .apply("FileTO to LOG TextIO", ParDo.of(new TextIOWriteDoFn())) .apply(TextIO.write().to(pipelineOptions.getFileStorageBucket()).withWindowedWrites() .withFilenamePolicy(new FileStorageFileNamePolicy(logTypeEnum)).withNumShards(10));
自定义 window :
public class FileTextIOWindowFn extends NonMergingWindowFn<Object, IntervalWindow> {
/**
*
*/
private static final long serialVersionUID = 1L;
private IntervalWindow assignWindow(AssignContext context) {
FilePushTO filePushTO = (FilePushTO) context.element();
String timestamp = filePushTO.getLogTime();
DateTimeFormatter formatter = DateTimeFormat.forPattern(CommonConstants.DATE_FORMAT_YYYYMMDD_HHMMSS_SSS)
.withZoneUTC();
Instant start_point = Instant.parse(timestamp, formatter);
Calendar cal = DateUtil.getCurrentDateInUTC();
SimpleDateFormat DATE_FORMATER_PARTITION_NAME = DateUtil.getDateFormater();
Instant end_point = Instant.parse(DATE_FORMATER_PARTITION_NAME.format(cal.getTime()), formatter);
return new IntervalWindow(start_point, end_point);
};
@Override
public Coder<IntervalWindow> windowCoder() {
return IntervalWindow.getCoder();
}
@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
return Arrays.asList(assignWindow(c));
}
@Override
public boolean isCompatible(WindowFn<?, ?> other) {
return false;
}
@Override
public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
throw new IllegalArgumentException(
"Attempted to get side input window for GlobalWindow from non-global WindowFn");
}
}
文件名策略:
public class FileStorageFileNamePolicy extends FileBasedSink.FilenamePolicy {
/**
*
*/
private static final long serialVersionUID = 1L;
private static Logger LOGGER = LoggerFactory.getLogger(FileStorageFileNamePolicy.class);
private LogTypeEnum logTypeEnum;
public FileStorageFileNamePolicy(LogTypeEnum logTypeEnum) {
this.logTypeEnum = logTypeEnum;
}
@Override
public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext context, String extension) {
IntervalWindow window = (IntervalWindow) context.getWindow();
String startDate = window.start().toString();
String dateString = startDate.replace("T", CommonConstants.SPACE)
.replaceAll(startDate.substring(startDate.indexOf("Z")), CommonConstants.EMPTY_STRING);
String startDateHour = startDate;
try {
startDate = DateUtil.getDateForFileStore(dateString, null);
startDateHour = DateUtil.getDTLocalTZHour(dateString, null);
} catch (ParseException e) {
LOGGER.error("Error converting date : {}", e);
}
String filename = new StringBuilder(window.start().toString()).append(CommonConstants.COLON)
.append(startDateHour).append(CommonConstants.UNDER_SCORE).append(context.getShardNumber())
.append(".txt").toString();
String dirName = new StringBuilder(startDate).append(CommonConstants.FORWARD_SLASH)
.append(logTypeEnum.getValue().toLowerCase()).append(CommonConstants.FORWARD_SLASH).toString();
LOGGER.info("Directory : {} and File Name : {}", dirName, filename);
return outputDirectory.resolve(dirName, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY)
.resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
}
@Override
public ResourceId unwindowedFilename(ResourceId outputDirectory, Context context, String extension) {
throw new UnsupportedOperationException("Unsupported.");
}
}
我用 Interval window 制作了 customWindow,这样我就可以在 FileNamePolicy 中获得适当的时间戳。我不能使用 fixedWindow,因为它总是给我当前时间戳。
此处一切正常,但无法附加文件。它们被覆盖了。
您可以使用 Beam 2.1 中提供的 TextIO.write().to(...).withWindowedWrites()
来执行此操作。参见 TextIO javadoc。