AWS Firehose 到 Elastic Search - 将一个 Firehose 记录转换为多个 Elastic 条目
AWS Firehose to Elastic Search - Transforming one Firehose record into multiple Elastic entries
我有一个优化(标准化)JSON 字符串传入我的 Lambda 函数。它被转发到 Firehose 到 Elastic Search。
我的计划是使用 Kinesis Data Transformation Lambda(参见 https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html)对 JSON 进行非规范化,以从中获取正确的 Elastic Search 条目。
这个问题是关于doability/how去做的。
这是我的基本 AWS 设置:
- Lambda 函数:从互联网获取规范化的 JSON,验证它,附加一些属性,通过
putRecord
. 将它转发到 Kinesis Firehose
- Kinesis Firehose 转换 lambda:获取一条记录,读取 JSON 并从中生成多个项目,returns 返回到 Firehose,后者将其转发到 Elastic Search
我的问题是:Firehose/the 转换 lambda 是否可以从一条记录中创建多个 Elastic Search 条目?
我将尝试用一些伪代码来想象这个场景:
lambda.js
exports.handler = async function (event) {
// 1. get inputDoc from request, which contains multiple es_documents
// 2. attach timestamp as property
// result:
const inputDoc = {
request_logged_at: '2017-02-07T15:13:01.39256Z',
es_documents: [
{
foo: 'bar'
},
{
foo: 'baz'
}
]
};
const firehoseParams = {
DeliveryStreamName: 'Delivery-Stream',
Record: {
Data: JSON.stringify(inputDoc)
}
};
await firehose.putRecord(firehoseParams).promise();
return { statusCode: 201 };
}
firehose/transform_lambda.js
exports.handler = (event, context) => {
const record = event.records[0];
const myDoc = (new Buffer(record.data, 'base64')).toString('utf8');
// Denormalize request_logged_at into the single documents, so every document in Elastic knows when it got logged
const docsToElastic = myDoc.es_documents.map(doc => {
return doc.request_logged_at = myDoc.request_logged_at;
});
// This is the main point: How to return this array back to Firehose so Elastic creates multiple table entries?
// My first guess is the following, as I've seen this syntax in other places
// (see first "Note" on this page[https://docs.aws.amazon.com/firehose/latest/dev/record-format-conversion.html])
const result = docsToElastic.reduce((accumulator, doc) => {
return accumulator + JSON.stringify(doc);
}, '');
// result: '{"foo":"bar","request_logged_at":"2017-02-07T15:13:01.39256Z"}{"foo":"baz","request_logged_at":"2017-02-07T15:13:01.39256Z"}'
const payload = (new Buffer(result, 'utf8')).toString('base64');
return {
recordId: record.recordId,
result: 'Ok',
data: payload
};
}
任何人都可以分享他对这个用例的了解吗?这行得通吗?
顺便说一句:我知道我可以在第一个 lambda 中进行非规范化并使用 firehose.putRecordBatch()
,但是第一个 lambda 已经有很多任务,这也是一个分离关注点的问题
我自己解决了。 docs 的这些提示给了我一个想法:
It [Firehose] then generates an Elasticsearch bulk request to index multiple records to your Elasticsearch cluster. Make sure that your record is UTF-8 encoded and flattened to a single-line JSON object before you send it to Kinesis Data Firehose.
所以在幕后实际发生的是 Firehose 执行 ES bulk request。那么 Firehose 可能会做什么:
POST _bulk
{ "index" : { "_index" : "my-index" } }
[FirehoseRecord.Data]
{ "index" : { "_index" : "my-index" } }
[FirehoseRecord.Data]
{ "index" : { "_index" : "my-index" } }
[FirehoseRecord.Data]
所以我所做的是修改 one FirehoseRecord.Data
以包含多行文档,每行由 { "index" : ...}
对象分隔:
const result = docsToElastic.reduce((accumulator, log, currentIndex) => {
return accumulator +
// Skip index at first time bcs. that is what Firehose does
(currentIndex === 0 ? '' : '{ "index" : { "_index" : "my-index"} }' + '\n') +
JSON.stringify(log) + '\n';
}, '');
输出:
{"foo":"bar","request_logged_at":"2017-02-07T15:13:01.39256Z"}
{ "index" : { "_index" : "my-index" } }
{"foo":"bar","request_logged_at":"2017-02-07T15:13:01.39256Z"}
{ "index" : { "_index" : "my-index" } }
{"foo":"bar","request_logged_at":"2017-02-07T15:13:01.39256Z"}
注意第一个丢失的 { "index" : ...}
对象。这是因为 Firehose 在整个记录之前添加了第一个 { "index" : ...}
本身。
这已经过测试并且可以正常工作。
我有一个优化(标准化)JSON 字符串传入我的 Lambda 函数。它被转发到 Firehose 到 Elastic Search。 我的计划是使用 Kinesis Data Transformation Lambda(参见 https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html)对 JSON 进行非规范化,以从中获取正确的 Elastic Search 条目。 这个问题是关于doability/how去做的。
这是我的基本 AWS 设置:
- Lambda 函数:从互联网获取规范化的 JSON,验证它,附加一些属性,通过
putRecord
. 将它转发到 Kinesis Firehose
- Kinesis Firehose 转换 lambda:获取一条记录,读取 JSON 并从中生成多个项目,returns 返回到 Firehose,后者将其转发到 Elastic Search
我的问题是:Firehose/the 转换 lambda 是否可以从一条记录中创建多个 Elastic Search 条目? 我将尝试用一些伪代码来想象这个场景:
lambda.js
exports.handler = async function (event) {
// 1. get inputDoc from request, which contains multiple es_documents
// 2. attach timestamp as property
// result:
const inputDoc = {
request_logged_at: '2017-02-07T15:13:01.39256Z',
es_documents: [
{
foo: 'bar'
},
{
foo: 'baz'
}
]
};
const firehoseParams = {
DeliveryStreamName: 'Delivery-Stream',
Record: {
Data: JSON.stringify(inputDoc)
}
};
await firehose.putRecord(firehoseParams).promise();
return { statusCode: 201 };
}
firehose/transform_lambda.js
exports.handler = (event, context) => {
const record = event.records[0];
const myDoc = (new Buffer(record.data, 'base64')).toString('utf8');
// Denormalize request_logged_at into the single documents, so every document in Elastic knows when it got logged
const docsToElastic = myDoc.es_documents.map(doc => {
return doc.request_logged_at = myDoc.request_logged_at;
});
// This is the main point: How to return this array back to Firehose so Elastic creates multiple table entries?
// My first guess is the following, as I've seen this syntax in other places
// (see first "Note" on this page[https://docs.aws.amazon.com/firehose/latest/dev/record-format-conversion.html])
const result = docsToElastic.reduce((accumulator, doc) => {
return accumulator + JSON.stringify(doc);
}, '');
// result: '{"foo":"bar","request_logged_at":"2017-02-07T15:13:01.39256Z"}{"foo":"baz","request_logged_at":"2017-02-07T15:13:01.39256Z"}'
const payload = (new Buffer(result, 'utf8')).toString('base64');
return {
recordId: record.recordId,
result: 'Ok',
data: payload
};
}
任何人都可以分享他对这个用例的了解吗?这行得通吗?
顺便说一句:我知道我可以在第一个 lambda 中进行非规范化并使用 firehose.putRecordBatch()
,但是第一个 lambda 已经有很多任务,这也是一个分离关注点的问题
我自己解决了。 docs 的这些提示给了我一个想法:
It [Firehose] then generates an Elasticsearch bulk request to index multiple records to your Elasticsearch cluster. Make sure that your record is UTF-8 encoded and flattened to a single-line JSON object before you send it to Kinesis Data Firehose.
所以在幕后实际发生的是 Firehose 执行 ES bulk request。那么 Firehose 可能会做什么:
POST _bulk
{ "index" : { "_index" : "my-index" } }
[FirehoseRecord.Data]
{ "index" : { "_index" : "my-index" } }
[FirehoseRecord.Data]
{ "index" : { "_index" : "my-index" } }
[FirehoseRecord.Data]
所以我所做的是修改 one FirehoseRecord.Data
以包含多行文档,每行由 { "index" : ...}
对象分隔:
const result = docsToElastic.reduce((accumulator, log, currentIndex) => {
return accumulator +
// Skip index at first time bcs. that is what Firehose does
(currentIndex === 0 ? '' : '{ "index" : { "_index" : "my-index"} }' + '\n') +
JSON.stringify(log) + '\n';
}, '');
输出:
{"foo":"bar","request_logged_at":"2017-02-07T15:13:01.39256Z"}
{ "index" : { "_index" : "my-index" } }
{"foo":"bar","request_logged_at":"2017-02-07T15:13:01.39256Z"}
{ "index" : { "_index" : "my-index" } }
{"foo":"bar","request_logged_at":"2017-02-07T15:13:01.39256Z"}
注意第一个丢失的 { "index" : ...}
对象。这是因为 Firehose 在整个记录之前添加了第一个 { "index" : ...}
本身。
这已经过测试并且可以正常工作。