使用 lambda 转换从 Kinesis Firehose 到 ES

Kinesis Firehose to ES using a lambda transformation

我想从订阅过滤器中获取日志,然后将日志放入 s3 存储桶中并将它们发送到 ES。

类似于此处的图表:

https://aws.amazon.com/solutions/implementations/centralized-logging/

当我使用这个功能时:

/*
 For processing data sent to Firehose by Cloudwatch Logs subscription filters.
 Cloudwatch Logs sends to Firehose records that look like this:
 {
   "messageType": "DATA_MESSAGE",
   "owner": "123456789012",
   "logGroup": "log_group_name",
   "logStream": "log_stream_name",
   "subscriptionFilters": [
     "subscription_filter_name"
   ],
   "logEvents": [
     {
       "id": "01234567890123456789012345678901234567890123456789012345",
       "timestamp": 1510109208016,
       "message": "log message 1"
     },
     {
       "id": "01234567890123456789012345678901234567890123456789012345",
       "timestamp": 1510109208017,
       "message": "log message 2"
     }
     ...
   ]
 }
 The data is additionally compressed with GZIP.
 The code below will:
 1) Gunzip the data
 2) Parse the json
 3) Set the result to ProcessingFailed for any record whose messageType is not DATA_MESSAGE, thus redirecting them to the
    processing error output. Such records do not contain any log events. You can modify the code to set the result to
    Dropped instead to get rid of these records completely.
 4) For records whose messageType is DATA_MESSAGE, extract the individual log events from the logEvents field, and pass
    each one to the transformLogEvent method. You can modify the transformLogEvent method to perform custom
    transformations on the log events.
 5) Concatenate the result from (4) together and set the result as the data of the record returned to Firehose. Note that
    this step will not add any delimiters. Delimiters should be appended by the logic within the transformLogEvent
    method.
 6) Any additional records which exceed 6MB will be re-ingested back into Firehose.
 */
const zlib = require('zlib');
const AWS = require('aws-sdk');

/**
 * logEvent has this format:
 *
 * {
 *   "id": "01234567890123456789012345678901234567890123456789012345",
 *   "timestamp": 1510109208016,
 *   "message": "log message 1"
 * }
 *
 * The default implementation below just extracts the message and appends a newline to it.
 *
 * The result must be returned in a Promise.
 */
function transformLogEvent(logEvent: any) {
  return Promise.resolve(`${logEvent.message}\n`);
}

function putRecordsToFirehoseStream(streamName: any, records: any, client: any, resolve: any, reject: any, attemptsMade: any, maxAttempts: any) {
  client.putRecordBatch({
    DeliveryStreamName: streamName,
    Records: records,
  }, (err: any, data: any) => {
    const codes = [];
    let failed = [];
    let errMsg = err;

    if (err) {
      failed = records;
    } else {
      for (let i = 0; i < data.RequestResponses.length; i++) {
        const code = data.RequestResponses[i].ErrorCode;
        if (code) {
          codes.push(code);
          failed.push(records[i]);
        }
      }
      errMsg = `Individual error codes: ${codes}`;
    }

    if (failed.length > 0) {
      if (attemptsMade + 1 < maxAttempts) {
        console.log('Some records failed while calling PutRecordBatch, retrying. %s', errMsg);
        putRecordsToFirehoseStream(streamName, failed, client, resolve, reject, attemptsMade + 1, maxAttempts);
      } else {
        reject(`Could not put records after ${maxAttempts} attempts. ${errMsg}`);
      }
    } else {
      resolve('');
    }
  });
}

function putRecordsToKinesisStream(streamName: any, records: any, client: any, resolve: any, reject: any, attemptsMade: any, maxAttempts: any) {
  client.putRecords({
    StreamName: streamName,
    Records: records,
  }, (err: any, data: any) => {
    const codes = [];
    let failed = [];
    let errMsg = err;

    if (err) {
      failed = records;
    } else {
      for (let i = 0; i < data.Records.length; i++) {
        const code = data.Records[i].ErrorCode;
        if (code) {
          codes.push(code);
          failed.push(records[i]);
        }
      }
      errMsg = `Individual error codes: ${codes}`;
    }

    if (failed.length > 0) {
      if (attemptsMade + 1 < maxAttempts) {
        console.log('Some records failed while calling PutRecords, retrying. %s', errMsg);
        putRecordsToKinesisStream(streamName, failed, client, resolve, reject, attemptsMade + 1, maxAttempts);
      } else {
        reject(`Could not put records after ${maxAttempts} attempts. ${errMsg}`);
      }
    } else {
      resolve('');
    }
  });
}

function createReingestionRecord(isSas: any, originalRecord: any) {
  if (isSas) {
    return {
      Data: Buffer.from(originalRecord.data, 'base64'),
      PartitionKey: originalRecord.kinesisRecordMetadata.partitionKey,
    };
  } else {
    return {
      Data: Buffer.from(originalRecord.data, 'base64'),
    };
  }
}


function getReingestionRecord(isSas: any, reIngestionRecord: any) {
  if (isSas) {
    return {
      Data: reIngestionRecord.Data,
      PartitionKey: reIngestionRecord.PartitionKey,
    };
  } else {
    return {
      Data: reIngestionRecord.Data,
    };
  }
}

exports.handler = (event: any, context: any, callback: any) => {
  Promise.all(event.records.map(function (r: any) {
    const buffer = Buffer.from(r.data, 'base64');

    let decompressed;
    try {
      decompressed = zlib.unzipSync(buffer);
    } catch (e) {
      return Promise.resolve({
        recordId: r.recordId,
        result: 'ProcessingFailed',
      });
    }

    const data = JSON.parse(decompressed);
    // CONTROL_MESSAGE are sent by CWL to check if the subscription is reachable.
    // They do not contain actual data.
    if (data.messageType === 'CONTROL_MESSAGE') {
      return Promise.resolve({
        recordId: r.recordId,
        result: 'Dropped',
      });
    } else if (data.messageType === 'DATA_MESSAGE') {
      const promises = data.logEvents.map(transformLogEvent);
      return Promise.all(promises)
        .then(transformed => {
          const payload: any = transformed.reduce(function (a: any, v: any) {
            return a + v;
          });
          const encoded = Buffer.from(payload).toString();
          return {
            recordId: r.recordId,
            result: 'Ok',
            data: encoded,
          };
        });
    } else {
      return Promise.resolve({
        recordId: r.recordId,
        result: 'ProcessingFailed',
      });
    }
  })).then(recs => {
    const isSas = Object.prototype.hasOwnProperty.call(event, 'sourceKinesisStreamArn');
    const streamARN = isSas ? event.sourceKinesisStreamArn : event.deliveryStreamArn;
    const region = streamARN.split(':')[3];
    const streamName = streamARN.split('/')[1];
    const result: any = { records: recs };
    let recordsToReingest = [];
    const putRecordBatches: any = [];
    let totalRecordsToBeReingested = 0;
    const inputDataByRecId: any = {};
    event.records.forEach(function (r: any) { inputDataByRecId[r.recordId] = createReingestionRecord(isSas, r) });

    let projectedSize = recs.filter(function (rec: any) { return rec.result === 'Ok' })
      .map(function (r: any) { return r.recordId.length + r.data.length })
      .reduce((a, b) => a + b, 0);
    // 6000000 instead of 6291456 to leave ample headroom for the stuff we didn't account for
    for (let idx = 0; idx < event.records.length && projectedSize > 6000000; idx++) {
      const rec: any = result.records[idx];
      if (rec.result === 'Ok') {
        totalRecordsToBeReingested++;
        recordsToReingest.push(getReingestionRecord(isSas, inputDataByRecId[rec.recordId]));
        projectedSize -= rec.data.length;
        delete rec.data;
        result.records[idx].result = 'Dropped';

        // split out the record batches into multiple groups, 500 records at max per group
        if (recordsToReingest.length === 500) {
          putRecordBatches.push(recordsToReingest);
          recordsToReingest = [];
        }
      }
    }

    if (recordsToReingest.length > 0) {
      // add the last batch
      putRecordBatches.push(recordsToReingest);
    }

    if (putRecordBatches.length > 0) {
      new Promise((resolve, reject) => {
        let recordsReingestedSoFar = 0;
        for (let idx = 0; idx < putRecordBatches.length; idx++) {
          const recordBatch = putRecordBatches[idx];
          if (isSas) {
            const client = new AWS.Kinesis({ region: region });
            putRecordsToKinesisStream(streamName, recordBatch, client, resolve, reject, 0, 20);
          } else {
            const client = new AWS.Firehose({ region: region });
            putRecordsToFirehoseStream(streamName, recordBatch, client, resolve, reject, 0, 20);
          }
          recordsReingestedSoFar += recordBatch.length;
          console.log('Reingested %s/%s records out of %s in to %s stream', recordsReingestedSoFar, totalRecordsToBeReingested, event.records.length, streamName);
        }}).then(
          () => {
            console.log('Reingested all %s records out of %s in to %s stream', totalRecordsToBeReingested, event.records.length, streamName);
            callback(null, result);
          },
          failed => {
            console.log('Failed to reingest records. %s', failed);
            callback(failed, null);
          });
    } else {
      console.log('No records needed to be reingested.');
      callback(null, result);
    }
  }).catch(ex => {
    console.log('Error: ', ex);
    callback(ex, null);
  });
}; 

但是我得到了 Lambda.FunctionError:

Check your function and make sure the output is in required format. In addition to that, make sure the processed records contain valid result status of Dropped, Ok, or ProcessingFailed

有谁知道哪个函数适合从 Cloudwatch 订阅过滤器接收日志,并将它们发送到 S3 和 ES?

我的 FirehoseDeliveryStream 代码如下所示:

const firehoseDeliveryStream = new CfnDeliveryStream(this, "FirehoseDeliveryStream", {
      deliveryStreamType: "DirectPut",
      elasticsearchDestinationConfiguration: {
        domainArn: elasticsearchDomain.domainArn,
        roleArn: firehoseDeliveryRole.roleArn,
        indexName: "test",

        s3Configuration: {
          bucketArn: this.logsBucket.bucketArn,
          roleArn: firehoseDeliveryRole.roleArn,
          cloudWatchLoggingOptions: {
            enabled: true,
            logGroupName: firehoseloggroup.logGroupName,
            logStreamName: logstream.logStreamName
          },
        },
        s3BackupMode: "AllDocuments",
        cloudWatchLoggingOptions: {
          enabled: true,
          logGroupName: firehoseloggroup.logGroupName,
          logStreamName: logstream.logStreamName
        },
        processingConfiguration: {
          enabled: true,
          processors: [{
            type: "Lambda",
            parameters: [{
              parameterName: "LambdaArn",
              parameterValue: handler.functionArn,
            }],
          }],
        },
      },
    });

我有一个 CloudWatch log-group-1、kinesis firehose、lambda、S3。

log-group-1 将日志发送到 kinesis firehose(使用订阅过滤器)。 Kinesis firehose 触发 lambda 来处理日志。 Lambda return 将日志返回到 kinesis firehose,kinesis firehose 将转换后的日志保存到 S3。

Lambda 获取以下输入:

{
  "invocationId": "000ac99...",
  "deliveryStreamArn": "arn:aws:firehose:eu-central-1:123456789123:deliverystream/delivery-09",
  "region": "eu-central-1",
  "records": [
    {
      "recordId": "496199814216613477...",
      "approximateArrivalTimestamp": 1625854080200,
      "data": "H4sIAAAAAAAAADWOwQrCM......"
    },
    {
      "recordId": "4961998142166134...",
      "approximateArrivalTimestamp": 1625854100311,
      "data": "H4sIAAAAAAAAADVPy07DMB......"
    }
  ]
}

要 return 转换后的消息,您必须更改 records 列表。参见示例:

"records": [
  {
    "recordId": "you better take it from the input",
    "result": "can be Ok, Dropped, ProcessingFailed",
    "data": "must be an encoded base-64 string"
  }
]

我附上了用 Javascipt 编写的代码。只需将其复制粘贴到 lambda 即可。

const node_gzip_1 = require("node-gzip");

async function handler(event) {
  console.log('event: ' + JSON.stringify(event, undefined, 3));
  let result = [];

  // Iterate through records list
  const records = event.records;
  for (let ii = 0; ii < records.length; ii++) {
    const record = records[ii];
    const recordId = record.recordId;

    // Transform record data to a human readable string
    const data = record.data;
    const decodedData = Buffer.from(data, 'base64');
    const ungziped = await node_gzip_1.ungzip(decodedData);
    console.log('ungziped: ' + ungziped);

    // Parse record data to JSON
    const dataJson = JSON.parse(ungziped.toString());

    // Get a list of log events and iterate through each element
    const logEventsList = dataJson.logEvents;
    logEventsList.forEach((logEventValue) => {
      // Get the message which was saved in CloudWatch
      const messageString = logEventValue.message;

      // Create the transformed result
      const transformedResultJson = {
        someRandomNumber: Math.random(), // Some random variable I decided to put in the result
        message: messageString + '-my-custom-change' // Edit the message
      };

      // Final data must be encoded to base 64
      const messageBase64 = Buffer.from(JSON.stringify(transformedResultJson) + '\n').toString('base64'); // Adding a new line to transformed result is optional. It just make reading the S3 easier
      console.log('messageBase64: ' + messageBase64);

      // Save transformed result
      result.push({
        recordId: recordId,
        result: 'Ok',
        data: messageBase64
      });
    });
  }

  // Replace initial records list with the transformed list
  event.records = result;
  console.log('new event: ' + JSON.stringify(event, undefined, 2));

  // Returned value will go back to kinesis firehose, then S3
  return event;
}
exports.handler = handler;

Lambda return 值为:

{
  "invocationId": "000ac99...",
  "deliveryStreamArn": "arn:aws:firehose:eu-central-1:123456789123:deliverystream/delivery-09",
  "region": "eu-central-1",
  "records": [
    {
      "recordId": "496199814216613477...",
      "result": "Ok",
      "data": "eyJzb21lUmF..."
    },
    {
      "recordId": "4961998142166134...",
      "result": "Ok",
      "data": "eyJzb21lUmFuZG9..."
    }
  ]
}

您还可以使用 lambda 蓝图 kinesis-firehose-syslog-to-json.

另见: