Elasticsearch 批量 API post 请求中的换行错误

NewLine error in Elasticsearch bulk API post request

我正在尝试使用 elasticsearch 批量 api 将多条记录插入到索引中。我的 JSON 看起来像这样:request json

我在文档末尾插入了一个新行 (\n),但我仍然收到 newline error.

    Error: {
        "error": {
            "root_cause": [
                {
                    "type": "illegal_argument_exception",
                    "reason": "The bulk request must be terminated by a newline [\n]"
                }
            ],
            "type": "illegal_argument_exception",
            "reason": "The bulk request must be terminated by a newline [\n]"
        },
        "status": 400
    }

你的 json 在某些时候是 nd-json(新行分隔)JSON,但现在看起来一团糟,所以我们必须事先做一些清理工作。

初始化:

const {
    Client
} = require("@elastic/elasticsearch");

const client = new Client({
    node: 'http://localhost:9200'
});

const INDEX_NAME = 'index_name';

将可能的 ndjson 转换为可使用的数组或对象:

const docs_as_body_params = JSON.parse(
    '[' +
    `{"index":{}} {"tags":["ab","cd"],"question":"test this","answer":"answer first"} {"index":{}} {"tags":["de","fg"],"question":"test second","answer":"answer second"}`.split(
        /(\s?{"index":{}} )/g
    )
    // filter out empty strings
    .filter(match => match.length)
    // take every odd member (skipping `{"index":{}}`)
    .filter((_, index) => index % 2 !== 0) +
    ']'
);

构造散体

const bulk_body = [];
docs_as_body_params.map((doc) => {
    bulk_body.push({
        index: {
            _index: INDEX_NAME,
            _id: doc.id || null
        }
    });
    bulk_body.push(doc);
});

执行批量索引:

client.bulk({
        body: bulk_body
    },
    (err, resp) => {
        if (err || resp.errors) {
            console.err(err || resp.errors)
        }
        console.info(resp.body.items);
    }
);

根据我之前的回答和:

const AWS = require('aws-sdk');
const creds = new AWS.EnvironmentCredentials('AWS');

const INDEX_NAME = 'index_name';

const esDomain = {
    region: 'us-east-1',
    endpoint: 'yoursearchdomain.region.amazonaws.com',
    index: 'myindex',
    doctype: 'mytype'
};

const endpoint = new AWS.Endpoint(esDomain.endpoint);
const req = new AWS.HttpRequest(endpoint);


const docs_as_body_params = JSON.parse(
    '[' +
    `{"index":{}} {"tags":["ab","cd"],"question":"test this","answer":"answer first"} {"index":{}} {"tags":["de","fg"],"question":"test second","answer":"answer second"}`.split(
        /(\s?{"index":{}} )/g
    )
    .filter(match => match.length)
    .filter((_, index) => index % 2 !== 0) +
    ']'
);

const bulk_body = [];
docs_as_body_params.map((doc) => {
    bulk_body.push({
        index: {
            _index: INDEX_NAME,
            _id: doc.id || null
        }
    });
    bulk_body.push(doc);
});

/// THE MOST IMPORTANT PART -- getting to a valid ndjson
const ndjson_payload = bulk_body.map(JSON.stringify).join('\n') + '\n'

req.method = 'POST';
req.path = '_bulk'
req.region = esDomain.region;
req.headers['presigned-expires'] = false;
req.headers['Host'] = endpoint.host;
req.headers['Content-Type'] = 'application/json';
req.body = ndjson_payload;

var signer = new AWS.Signers.V4(req, 'es');
signer.addAuthorization(creds, new Date());

var send = new AWS.NodeHttpClient();
send.handleRequest(req, null, function (httpResp) {
    var respBody = '';
    httpResp.on('data', function (chunk) {
        respBody += chunk;
    });
    httpResp.on('end', function (chunk) {
        console.log('Response: ' + respBody);
        context.succeed('Lambda added document ' + doc);
    });
}, function (err) {
    console.log('Error: ' + err);
    context.fail('Lambda failed with error ' + err);
});