Elasticsearch 批量 API post 请求中的换行错误
NewLine error in Elasticsearch bulk API post request
我正在尝试使用 elasticsearch 批量 api 将多条记录插入到索引中。我的 JSON 看起来像这样:request json
我在文档末尾插入了一个新行 (\n
),但我仍然收到 newline error
.
Error: {
"error": {
"root_cause": [
{
"type": "illegal_argument_exception",
"reason": "The bulk request must be terminated by a newline [\n]"
}
],
"type": "illegal_argument_exception",
"reason": "The bulk request must be terminated by a newline [\n]"
},
"status": 400
}
你的 json
在某些时候是 nd-json
(新行分隔)JSON,但现在看起来一团糟,所以我们必须事先做一些清理工作。
初始化:
const {
Client
} = require("@elastic/elasticsearch");
const client = new Client({
node: 'http://localhost:9200'
});
const INDEX_NAME = 'index_name';
将可能的 ndjson 转换为可使用的数组或对象:
const docs_as_body_params = JSON.parse(
'[' +
`{"index":{}} {"tags":["ab","cd"],"question":"test this","answer":"answer first"} {"index":{}} {"tags":["de","fg"],"question":"test second","answer":"answer second"}`.split(
/(\s?{"index":{}} )/g
)
// filter out empty strings
.filter(match => match.length)
// take every odd member (skipping `{"index":{}}`)
.filter((_, index) => index % 2 !== 0) +
']'
);
构造散体
const bulk_body = [];
docs_as_body_params.map((doc) => {
bulk_body.push({
index: {
_index: INDEX_NAME,
_id: doc.id || null
}
});
bulk_body.push(doc);
});
执行批量索引:
client.bulk({
body: bulk_body
},
(err, resp) => {
if (err || resp.errors) {
console.err(err || resp.errors)
}
console.info(resp.body.items);
}
);
根据我之前的回答和:
const AWS = require('aws-sdk');
const creds = new AWS.EnvironmentCredentials('AWS');
const INDEX_NAME = 'index_name';
const esDomain = {
region: 'us-east-1',
endpoint: 'yoursearchdomain.region.amazonaws.com',
index: 'myindex',
doctype: 'mytype'
};
const endpoint = new AWS.Endpoint(esDomain.endpoint);
const req = new AWS.HttpRequest(endpoint);
const docs_as_body_params = JSON.parse(
'[' +
`{"index":{}} {"tags":["ab","cd"],"question":"test this","answer":"answer first"} {"index":{}} {"tags":["de","fg"],"question":"test second","answer":"answer second"}`.split(
/(\s?{"index":{}} )/g
)
.filter(match => match.length)
.filter((_, index) => index % 2 !== 0) +
']'
);
const bulk_body = [];
docs_as_body_params.map((doc) => {
bulk_body.push({
index: {
_index: INDEX_NAME,
_id: doc.id || null
}
});
bulk_body.push(doc);
});
/// THE MOST IMPORTANT PART -- getting to a valid ndjson
const ndjson_payload = bulk_body.map(JSON.stringify).join('\n') + '\n'
req.method = 'POST';
req.path = '_bulk'
req.region = esDomain.region;
req.headers['presigned-expires'] = false;
req.headers['Host'] = endpoint.host;
req.headers['Content-Type'] = 'application/json';
req.body = ndjson_payload;
var signer = new AWS.Signers.V4(req, 'es');
signer.addAuthorization(creds, new Date());
var send = new AWS.NodeHttpClient();
send.handleRequest(req, null, function (httpResp) {
var respBody = '';
httpResp.on('data', function (chunk) {
respBody += chunk;
});
httpResp.on('end', function (chunk) {
console.log('Response: ' + respBody);
context.succeed('Lambda added document ' + doc);
});
}, function (err) {
console.log('Error: ' + err);
context.fail('Lambda failed with error ' + err);
});
我正在尝试使用 elasticsearch 批量 api 将多条记录插入到索引中。我的 JSON 看起来像这样:request json
我在文档末尾插入了一个新行 (\n
),但我仍然收到 newline error
.
Error: {
"error": {
"root_cause": [
{
"type": "illegal_argument_exception",
"reason": "The bulk request must be terminated by a newline [\n]"
}
],
"type": "illegal_argument_exception",
"reason": "The bulk request must be terminated by a newline [\n]"
},
"status": 400
}
你的 json
在某些时候是 nd-json
(新行分隔)JSON,但现在看起来一团糟,所以我们必须事先做一些清理工作。
初始化:
const {
Client
} = require("@elastic/elasticsearch");
const client = new Client({
node: 'http://localhost:9200'
});
const INDEX_NAME = 'index_name';
将可能的 ndjson 转换为可使用的数组或对象:
const docs_as_body_params = JSON.parse(
'[' +
`{"index":{}} {"tags":["ab","cd"],"question":"test this","answer":"answer first"} {"index":{}} {"tags":["de","fg"],"question":"test second","answer":"answer second"}`.split(
/(\s?{"index":{}} )/g
)
// filter out empty strings
.filter(match => match.length)
// take every odd member (skipping `{"index":{}}`)
.filter((_, index) => index % 2 !== 0) +
']'
);
构造散体
const bulk_body = [];
docs_as_body_params.map((doc) => {
bulk_body.push({
index: {
_index: INDEX_NAME,
_id: doc.id || null
}
});
bulk_body.push(doc);
});
执行批量索引:
client.bulk({
body: bulk_body
},
(err, resp) => {
if (err || resp.errors) {
console.err(err || resp.errors)
}
console.info(resp.body.items);
}
);
根据我之前的回答和:
const AWS = require('aws-sdk');
const creds = new AWS.EnvironmentCredentials('AWS');
const INDEX_NAME = 'index_name';
const esDomain = {
region: 'us-east-1',
endpoint: 'yoursearchdomain.region.amazonaws.com',
index: 'myindex',
doctype: 'mytype'
};
const endpoint = new AWS.Endpoint(esDomain.endpoint);
const req = new AWS.HttpRequest(endpoint);
const docs_as_body_params = JSON.parse(
'[' +
`{"index":{}} {"tags":["ab","cd"],"question":"test this","answer":"answer first"} {"index":{}} {"tags":["de","fg"],"question":"test second","answer":"answer second"}`.split(
/(\s?{"index":{}} )/g
)
.filter(match => match.length)
.filter((_, index) => index % 2 !== 0) +
']'
);
const bulk_body = [];
docs_as_body_params.map((doc) => {
bulk_body.push({
index: {
_index: INDEX_NAME,
_id: doc.id || null
}
});
bulk_body.push(doc);
});
/// THE MOST IMPORTANT PART -- getting to a valid ndjson
const ndjson_payload = bulk_body.map(JSON.stringify).join('\n') + '\n'
req.method = 'POST';
req.path = '_bulk'
req.region = esDomain.region;
req.headers['presigned-expires'] = false;
req.headers['Host'] = endpoint.host;
req.headers['Content-Type'] = 'application/json';
req.body = ndjson_payload;
var signer = new AWS.Signers.V4(req, 'es');
signer.addAuthorization(creds, new Date());
var send = new AWS.NodeHttpClient();
send.handleRequest(req, null, function (httpResp) {
var respBody = '';
httpResp.on('data', function (chunk) {
respBody += chunk;
});
httpResp.on('end', function (chunk) {
console.log('Response: ' + respBody);
context.succeed('Lambda added document ' + doc);
});
}, function (err) {
console.log('Error: ' + err);
context.fail('Lambda failed with error ' + err);
});