Logstash -> Elasticsearch - 更新非规范化数据
Logstash -> Elasticsearch - update denormalized data
用例说明
我们有一个关系数据库,其中包含有关我们日常运营的数据。目标是让用户使用全文搜索引擎搜索重要数据。数据是规范化的,因此不是进行全文查询的最佳形式,所以我们的想法是对数据的一个子集进行非规范化,并将其实时复制到 Elasticsearch,这使我们能够创建一个快速准确的搜索应用程序.
我们已经有一个系统可以实现 Event Sourcing 我们的数据库操作(插入、更新、删除)。事件只包含更改的列和主键(在更新时我们没有得到整行)。 Logstash 已收到每个事件的通知,因此这部分已处理。
实际问题
现在我们开始解决问题了。由于计划是对我们的数据进行非规范化,因此我们必须确保父对象的更新传播到 Elasticsearch 中的非规范化子对象。我们如何配置 logstash 来执行此操作?
例子
假设我们在 Elasticsearch 中维护了一个 Employees
的列表。每个 Employee
分配给一个 Company
。由于数据是非规范化的(为了更快的搜索),每个 Employee
还带有 Company
的名称和地址。更新更改了 Company
的名称 - 我们如何配置 logstash 来更新所有 Employees
中分配给 Company
的公司名称?
补充说明
@Darth_Vader:
我们面临的问题是,我们得到一个 Company
已经改变的事件,但是我们想在 Elasticsearch 中修改 Employee
类型的文档,因为它们本身携带了关于公司的数据。您的回答预计我们会为每个 Employee
获得一个事件,但事实并非如此。
也许这样会更清楚。我们在 Elasticsearch 中有 3 名员工:
{type:'employee',id:'1',name:'Person 1',company.cmp_id:'1',company.name:'Company A'}
{type:'employee',id:'2',name:'Person 2',company.cmp_id:'1',company.name:'Company A'}
{type:'employee',id:'3',name:'Person 3',company.cmp_id:'2',company.name:'Company B'}
然后在源数据库中发生更新。
UPDATE company SET name = 'Company NEW' WHERE cmp_id = 1;
我们在 logstash 中得到一个事件,它说的是这样的:
{type:'company',cmp_id:'1',old.name:'Company A',new.name:'Company NEW'}
然后应将其传播到 Elasticsearch,以便生成的员工为:
{type:'employee',id:'1',name:'Person 1',company.cmp_id:'1',company.name:'Company NEW'}
{type:'employee',id:'2',name:'Person 2',company.cmp_id:'1',company.name:'Company NEW'}
{type:'employee',id:'3',name:'Person 3',company.cmp_id:'2',company.name:'Company B'}
请注意字段 company.name
已更改。
我建议采用与我发布的内容类似的解决方案 ,即使用 http
输出插件以便通过对 Employee 索引的查询调用发出更新。查询需要如下所示:
POST employees/_update_by_query
{
"script": {
"source": "ctx._source.company.name = params.name",
"lang": "painless",
"params": {
"name": "Company NEW"
}
},
"query": {
"term": {
"company.cmp_id": "1"
}
}
}
因此您的 Logstash 配置应如下所示:
input {
...
}
filter {
mutate {
add_field => {
"[script][lang]" => "painless"
"[script][source]" => "ctx._source.company.name = params.name"
"[script][params][name]" => "%{new.name}"
"[query][term][company.cmp_id]" => "%{cmp_id}"
}
remove_field => ["host", "@version", "@timestamp", "type", "cmp_id", "old.name", "new.name"]
}
}
output {
http {
url => "http://localhost:9200/employees/_update_by_query"
http_method => "post"
format => "json"
}
}
用例说明
我们有一个关系数据库,其中包含有关我们日常运营的数据。目标是让用户使用全文搜索引擎搜索重要数据。数据是规范化的,因此不是进行全文查询的最佳形式,所以我们的想法是对数据的一个子集进行非规范化,并将其实时复制到 Elasticsearch,这使我们能够创建一个快速准确的搜索应用程序.
我们已经有一个系统可以实现 Event Sourcing 我们的数据库操作(插入、更新、删除)。事件只包含更改的列和主键(在更新时我们没有得到整行)。 Logstash 已收到每个事件的通知,因此这部分已处理。
实际问题
现在我们开始解决问题了。由于计划是对我们的数据进行非规范化,因此我们必须确保父对象的更新传播到 Elasticsearch 中的非规范化子对象。我们如何配置 logstash 来执行此操作?
例子
假设我们在 Elasticsearch 中维护了一个 Employees
的列表。每个 Employee
分配给一个 Company
。由于数据是非规范化的(为了更快的搜索),每个 Employee
还带有 Company
的名称和地址。更新更改了 Company
的名称 - 我们如何配置 logstash 来更新所有 Employees
中分配给 Company
的公司名称?
补充说明
@Darth_Vader:
我们面临的问题是,我们得到一个 Company
已经改变的事件,但是我们想在 Elasticsearch 中修改 Employee
类型的文档,因为它们本身携带了关于公司的数据。您的回答预计我们会为每个 Employee
获得一个事件,但事实并非如此。
也许这样会更清楚。我们在 Elasticsearch 中有 3 名员工:
{type:'employee',id:'1',name:'Person 1',company.cmp_id:'1',company.name:'Company A'}
{type:'employee',id:'2',name:'Person 2',company.cmp_id:'1',company.name:'Company A'}
{type:'employee',id:'3',name:'Person 3',company.cmp_id:'2',company.name:'Company B'}
然后在源数据库中发生更新。
UPDATE company SET name = 'Company NEW' WHERE cmp_id = 1;
我们在 logstash 中得到一个事件,它说的是这样的:
{type:'company',cmp_id:'1',old.name:'Company A',new.name:'Company NEW'}
然后应将其传播到 Elasticsearch,以便生成的员工为:
{type:'employee',id:'1',name:'Person 1',company.cmp_id:'1',company.name:'Company NEW'}
{type:'employee',id:'2',name:'Person 2',company.cmp_id:'1',company.name:'Company NEW'}
{type:'employee',id:'3',name:'Person 3',company.cmp_id:'2',company.name:'Company B'}
请注意字段 company.name
已更改。
我建议采用与我发布的内容类似的解决方案 http
输出插件以便通过对 Employee 索引的查询调用发出更新。查询需要如下所示:
POST employees/_update_by_query
{
"script": {
"source": "ctx._source.company.name = params.name",
"lang": "painless",
"params": {
"name": "Company NEW"
}
},
"query": {
"term": {
"company.cmp_id": "1"
}
}
}
因此您的 Logstash 配置应如下所示:
input {
...
}
filter {
mutate {
add_field => {
"[script][lang]" => "painless"
"[script][source]" => "ctx._source.company.name = params.name"
"[script][params][name]" => "%{new.name}"
"[query][term][company.cmp_id]" => "%{cmp_id}"
}
remove_field => ["host", "@version", "@timestamp", "type", "cmp_id", "old.name", "new.name"]
}
}
output {
http {
url => "http://localhost:9200/employees/_update_by_query"
http_method => "post"
format => "json"
}
}