AWS Kinesis Firehose 到 Lambda,Lambda 到 S3 使用 java

AWS Kinesis Firehose to Lambda , Lambda to S3 using java

对于我的 expolartion,创建了 AWS firehose 流并配置了 Lambda 函数并将数据移动到 S3。 Firehose 到 S3 它工作正常,没有任何问题。如果我启用 lamda 函数,在 S3 失败存储桶中出现以下错误。

{"attemptsMade":4,"arrivalTimestamp":1570727830210,"errorCode":"Lambda.FunctionError","errorMessage":"The Lambda function was successfully invoked but it returned an error result."

lambda java 代码:

public class LambdaFunctionHandler implements RequestHandler<KinesisEvent, String> {

    @Override
    public String handleRequest(KinesisEvent event, Context context) {
        context.getLogger().log("Input: " + event);
        StringBuffer sb = new StringBuffer();
        for (KinesisEventRecord record : event.getRecords()) {
            String payload = new String(record.getKinesis().getData().array());
            if (payload.toLowerCase().contains("scala"))
                sb.append(payload);
            sb.append("\n");
        }

        return sb.toString();
    }
}

基本上,过滤传入的流数据并推送到 S3。我也有一些疑问。 1.我正在将 punch JSON 数据传递给 firehose。 "record.getKinesis().getData()" 方法将逐行读取记录并将其打包成整个 json 字符串。 2.书面日志声明。在哪里查看我的日志。 我该如何处理这种情况?请指教

AWS Lambda Java Events 2.x library has support for the KinesisFirehoseEvent。 1.x 图书馆没有这个 class。

您的代码将类似于:

public class LambdaFunctionHandler implements RequestHandler<KinesisFirehoseEvent, String> {

    @Override
    public String handleRequest(KinesisFirehoseEvent event, Context context) {
    }
}

在 Lambda 测试环境中,事件将看起来像:

{
  "invocationId": "invocationIdExample",
  "deliveryStreamArn": "arn:aws:kinesis:EXAMPLE",
  "region": "us-west-2",
  "records": [
    {
      "recordId": "49546986683135544286507457936321625675700192471156785154",
      "approximateArrivalTimestamp": 1495072949453,
      "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4="
    }
  ]
}