在 AWS Lambda 中多次处理 S3 文件

S3 files being processed multiple times in AWS Lambda

我有一个 Java Lambda 函数,每 15 分钟由 S3 事件触发一次。我注意到,在大约每 3 小时的时间段内,每个 Lambda 调用都包含上传的最新文件以及在该 3 小时时间跨度内在它之前上传的所有文件。

因此,如果在遍历整个列表时,它会重复在较早的 Lambda 调用中已经处理过的文件。

如何让它只处理最近上传的文件?在 node.js 中,有一个 context.suceed(),我假设它将该事件标记为已成功处理。 Java 好像没有。

以下是 Cloudwatch 日志。

08:35:16 START RequestId: 56c0dc17-6f77-11e6-a102-7517541f4ac3 Version: $LATEST
08:35:26 TIME - AUTHENTICATE: 8101ms
08:35:26 TIME - MESSAGE PARSE: 1ms
08:35:26 data :: event/events/2016/    08/31/2016    0831123000.export.csv
08:35:35 Processed 147 events
08:35:35 TIME - FILE PARSE: 9698
08:35:35 Found 1 event files
08:35:35 Total function took: 17800ms
08:35:35 END RequestId: 56c0dc17-6f77-11e6-a102-7517541f4ac3
08:35:35 REPORT RequestId: 56c0dc17-6f77-11e6-a102-7517541f4ac3 Duration: 19403.67 ms Billed Duration: 19500 ms Memory Size: 192 MB Max Memory Used: 116 MB
08:45:03 START RequestId: bcb8e064-6f78-11e6-baea-a312004d2418 Version: $LATEST
08:45:03 TIME - AUTHENTICATE: 119ms
08:45:03 TIME - MESSAGE PARSE: 0ms
08:45:03 data :: event/events/2016/    08/31/2016    0831123000.export.csv
08:45:05 Processed 147 events
08:45:05 data :: event/events/2016/    08/31/2016    0831124500.export.csv
08:45:06 Processed 211 events
08:45:06 TIME - FILE PARSE: 2499
08:45:06 Found 2 event files
08:45:06 Total function took: 2618ms
08:45:06 END RequestId: bcb8e064-6f78-11e6-baea-a312004d2418
08:45:06 REPORT RequestId: bcb8e064-6f78-11e6-baea-a312004d2418 Duration: 2796.25 ms Billed Duration: 2800 ms Memory Size: 192 MB Max Memory Used: 116 MB
09:05:02 START RequestId: 8747aa    08-6f7b-11e6-80fd-f30a15cf07fc Version: $LATEST
09:05:02 TIME - AUTHENTICATE: 98ms
09:05:02 TIME - MESSAGE PARSE: 0ms
09:05:02 data :: event/events/2016/    08/31/2016    0831123000.export.csv
09:05:03 Processed 147 events
09:05:03 data :: event/events/2016/    08/31/2016    0831124500.export.csv
09:05:04 Processed 211 events
09:05:04 data :: event/events/2016/    08/31/2016    0831130000.export.csv
09:05:04 Processed 204 events
09:05:04 TIME - FILE PARSE: 2242
09:05:04 Found 3 event files
09:05:04 Total function took: 2340ms
09:05:04 END RequestId: 8747aa    08-6f7b-11e6-80fd-f30a15cf07fc 

编辑 1 我相信迈克尔已经回答了这个问题,但是下面是其他人的一些代码。我确实在使用全局列表来保存记录。

public class LambdaHandler {

private final List<GDELTEventFile> eventFiles = new ArrayList<>();
private AmazonS3Client s3Client;
private final CSVFormat CSV_FORMAT = CSVFormat.TDF.withIgnoreEmptyLines().withTrim();

public void gdeltHandler(S3Event event, Context context) {
    StopWatch sw = new StopWatch();
    long time = 0L;

    sw.start();
    s3Client = new AmazonS3Client(new EnvironmentVariableCredentialsProvider());
    sw.split();
    System.out.println("TIME - AUTHENTICATE: " + sw.getSplitTime() + "ms");
    time += sw.getSplitTime();
    sw.reset();

    sw.start();
    processEvent(event);
    sw.split();
    System.out.println("TIME - MESSAGE PARSE: " + sw.getSplitTime() + "ms");
    time += sw.getSplitTime();
    sw.reset();

    sw.start();
    processFiles();
    sw.split();
    System.out.println("TIME - FILE PARSE: " + sw.getSplitTime());
    time += sw.getSplitTime();

    System.out.println("Found " + eventFiles.size() + " event files");
    System.out.println("Total function took: " + time + "ms");
}

private void processEvent(S3Event event) {
    List<S3EventNotification.S3EventNotificationRecord> records = event.getRecords();
    for (S3EventNotification.S3EventNotificationRecord record : records) {
        long filesize = record.getS3().getObject().getSizeAsLong();
        eventFiles.add(new GDELTEventFile(record.getS3().getBucket().getName(), record.getS3().getObject().getKey(), filesize));
    }
}

private void processFiles() {
    for (GDELTEventFile event : eventFiles) {
        try {
            System.out.println(event.getBucket() + " :: " + event.getFilename());
            GetObjectRequest request = new GetObjectRequest(event.getBucket(), event.getFilename());
            S3Object file = s3Client.getObject(request);
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(file.getObjectContent()))) {
                CSVParser parser = new CSVParser(reader, CSV_FORMAT);
                int count = 0;
                for (CSVRecord record : parser) {
                        count++;
                    }
                }
                System.out.println("Processed " + count + " events");
            }
        } catch (IOException ioe) {
            System.out.println("IOException :: " + ioe);
        }
    }
}

这是一个代码案例,它忽略了 Lambda 的一个重要方面 container reuse -- Lambda 中的容器重用包括进程重用。当一个函数在重用容器中执行时,它也必须 运行 在与之前使用的相同进程中。

S3 的事件通知数据结构是这样的,它可以为每个事件包含多个对象,但我实践中,这永远不会发生......但是将事件数据推送到全局结构中意味着如果容器被重用,然后以后的函数调用将看到旧数据。

虽然这作为缓存非常有用,但它对必须如何设计代码具有重要意义——始终期望但永远不要假设您的进程可以从一次调用到未来、后续调用和相应的代码中存活.

请注意,容器重用还意味着您需要清理任何临时文件,如果容器的多次重用可能会导致那里 space 耗尽。

另请注意,重新部署您的功能代码始终意味着旧容器将被放弃,不会被重新用于最新版本的未来调用。