使用 Logstash 将 json 字符串注入不同的 Elasticsearch 索引
Injecting json string into different Elasticsearch indices using Logstash
我将这些示例数据存储在 postgresql 中 events
table:
{"playhead":0,"metadata":"{\"class\":\"Broadway\\Domain\\Metadata\",\"payload\":{\"console\":{\"command\":\"Doctrine\\Bundle\\FixturesBundle\\Command\\LoadDataFixturesDoctrineCommand\",\"arguments\":\"'doctrine:fixtures:load' --env=prod -n\"}}}","@timestamp":"2020-05-13T18:03:24.295Z","payload":"{\"class\":\"OpenLoyalty\\Component\\Transaction\\Domain\\Event\\TransactionWasRegistered\",\"payload\":{\"transactionId\":\"91d83147-0280-481e-a984-c899e9720ec2\",\"transactionData\":{\"documentNumber\":\"not-matched\",\"purchasePlace\":\"wroclaw\",\"purchaseDate\":1582622706,\"documentType\":\"sell\"},\"customerData\":{\"name\":\"Jan Nowak\",\"email\":\"not_matched@example.com\",\"nip\":\"aaa\",\"phone\":\"+0954730076810\",\"loyaltyCardNumber\":\"not_matched_card_number\",\"address\":{\"street\":\"Ko\u015bciuszki\",\"address1\":\"12\",\"city\":\"Warsaw\",\"country\":\"PL\",\"province\":\"Mazowieckie\",\"postal\":\"00-800\"}},\"items\":[{\"sku\":{\"code\":\"SKU1\"},\"name\":\"item 1\",\"quantity\":1,\"grossValue\":1,\"category\":\"aaa\",\"labels\":[{\"key\":\"test\",\"value\":\"label\"},{\"key\":\"test\",\"value\":\"label2\"}],\"maker\":\"sss\"},{\"sku\":{\"code\":\"SKU2\"},\"name\":\"item 2\",\"quantity\":2,\"grossValue\":2,\"category\":\"bbb\",\"labels\":[],\"maker\":\"ccc\"}],\"posId\":null,\"excludedDeliverySKUs\":null,\"excludedLevelSKUs\":null,\"excludedLevelCategories\":null,\"revisedDocument\":null,\"labels\":[]}}","@version":"1","id":1,"type":"OpenLoyalty.Component.Transaction.Domain.Event.TransactionWasRegistered","uuid":"91d83147-0280-481e-a984-c899e9720ec2","recorded_on":"2020-02-24T09:25:06.222986+00:00"}
{"playhead":1,"metadata":"{\"class\":\"Broadway\\Domain\\Metadata\",\"payload\":{\"console\":{\"command\":\"Doctrine\\Bundle\\FixturesBundle\\Command\\LoadDataFixturesDoctrineCommand\",\"arguments\":\"'doctrine:fixtures:load' --env=prod -n\"}}}","@timestamp":"2020-05-13T18:03:24.430Z","payload":"{\"class\":\"OpenLoyalty\\Component\\Customer\\Domain\\Event\\AssignedAccountToCustomer\",\"payload\":{\"customerId\":\"00000000-0000-474c-b092-b0dd880c07aa\",\"accountId\":\"3c067099-486a-41a8-87ca-343305126c5e\"}}","@version":"1","id":80,"type":"OpenLoyalty.Component.Customer.Domain.Event.AssignedAccountToCustomer","uuid":"00000000-0000-474c-b092-b0dd880c07aa","recorded_on":"2020-02-24T09:25:20.012550+00:00"}
{"playhead":2,"metadata":"{\"class\":\"Broadway\\Domain\\Metadata\",\"payload\":{\"console\":{\"command\":\"Doctrine\\Bundle\\FixturesBundle\\Command\\LoadDataFixturesDoctrineCommand\",\"arguments\":\"'doctrine:fixtures:load' --env=prod -n\"}}}","@timestamp":"2020-05-13T18:03:24.430Z","payload":"{\"class\":\"OpenLoyalty\\Component\\Customer\\Domain\\Event\\CustomerWasMovedToLevel\",\"payload\":{\"customerId\":\"00000000-0000-474c-b092-b0dd880c07aa\",\"levelId\":\"e82c96cf-32a3-43bd-9034-4df343e50000\",\"oldLevelId\":null,\"updatedAt\":1582536320,\"manually\":false,\"removeLevelManually\":false}}","@version":"1","id":81,"type":"OpenLoyalty.Component.Customer.Domain.Event.CustomerWasMovedToLevel","uuid":"00000000-0000-474c-b092-b0dd880c07aa","recorded_on":"2020-02-24T09:25:20.014810+00:00"}
和许多其他具有不同 type
值的人。
我正在寻找的是将这些数据的 payload
插入到它们相应的 elasticsearch 索引中。例如 type
= OpenLoyalty.Component.Customer.Domain.Event.AssignedAccountToCustomer 的数据必须进入 oloy.account_details
索引。索引应该是这样的:
{
"_index": "oloy.account_details",
"_type": "OpenLoyalty\Component\Account\Domain\ReadModel\AccountDetails",
"_id": "AXGjvhDQUCC0J4ATWiCP",
"_version": 1,
"_score": 1,
"_source": {
"accountId": "5a9bdb83-f7e8-442c-b06f-387e1b1e95a7",
"customerId": "11111111-0000-474c-b092-b0dd880c07e1"
}
}
我想知道我应该为Logstash 设置什么配置。到目前为止我有这个:
input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://localhost:5432/openloyalty"
jdbc_user => "openloyalty"
jdbc_password => "openloyalty"
jdbc_driver_library => "C:\logstash\postgresql-42.2.12.jar"
jdbc_driver_class => "org.postgresql.Driver"
statement => "SELECT * from events"
}
}
output {
elasticsearch {
index => "???"
document_type => "???"
document_id => "???"
hosts => ["localhost:9200"]
}
}
您的答案在这里:
input {
file{
path => "C:/Users/Khatere Tajfar/Desktop/Log/application.log"
start_position => "beginning"
ignore_older => 0
}
}
filter {
grok {
match=>{"message"=>"%{GREEDYDATA:part1} INFO %{GREEDYDATA:part2}\{\"indexType\"\:\"%{GREEDYDATA:type}\"\,\"indexName\"\:\"%{GREEDYDATA:index}\"\,\"model\"\:\"%{GREEDYDATA:jsonpart}\"\,\"id\"\:\"%{GREEDYDATA:id}\"\}"}
}
mutate {
remove_field => [ "part1","part2","message","path","host","@timestamp","@version"]
gsub => ["jsonpart","[\\\]+","X"]
gsub => ["jsonpart","[\]","X"]
gsub => ["jsonpart","X",""]
}
json {
source => "jsonpart"
}
mutate {
remove_field => [ "jsonpart"]
}
mutate { add_field => { "[@metadata][type]" => "%{type}" } }
mutate { remove_field => ["type"] }
mutate { add_field => { "[@metadata][index]" => "%{index}" } }
mutate { remove_field => ["index"] }
mutate { add_field => { "[@metadata][id]" => "%{id}" } }
mutate { remove_field => ["id"] }
}
output {
stdout { codec => json_lines}
elasticsearch {
index => "%{[@metadata][index]}"
document_type => "%{[@metadata][type]}"
document_id => "%{[@metadata][id]}"
hosts => ["localhost:9200"]
}
}
我将这些示例数据存储在 postgresql 中 events
table:
{"playhead":0,"metadata":"{\"class\":\"Broadway\\Domain\\Metadata\",\"payload\":{\"console\":{\"command\":\"Doctrine\\Bundle\\FixturesBundle\\Command\\LoadDataFixturesDoctrineCommand\",\"arguments\":\"'doctrine:fixtures:load' --env=prod -n\"}}}","@timestamp":"2020-05-13T18:03:24.295Z","payload":"{\"class\":\"OpenLoyalty\\Component\\Transaction\\Domain\\Event\\TransactionWasRegistered\",\"payload\":{\"transactionId\":\"91d83147-0280-481e-a984-c899e9720ec2\",\"transactionData\":{\"documentNumber\":\"not-matched\",\"purchasePlace\":\"wroclaw\",\"purchaseDate\":1582622706,\"documentType\":\"sell\"},\"customerData\":{\"name\":\"Jan Nowak\",\"email\":\"not_matched@example.com\",\"nip\":\"aaa\",\"phone\":\"+0954730076810\",\"loyaltyCardNumber\":\"not_matched_card_number\",\"address\":{\"street\":\"Ko\u015bciuszki\",\"address1\":\"12\",\"city\":\"Warsaw\",\"country\":\"PL\",\"province\":\"Mazowieckie\",\"postal\":\"00-800\"}},\"items\":[{\"sku\":{\"code\":\"SKU1\"},\"name\":\"item 1\",\"quantity\":1,\"grossValue\":1,\"category\":\"aaa\",\"labels\":[{\"key\":\"test\",\"value\":\"label\"},{\"key\":\"test\",\"value\":\"label2\"}],\"maker\":\"sss\"},{\"sku\":{\"code\":\"SKU2\"},\"name\":\"item 2\",\"quantity\":2,\"grossValue\":2,\"category\":\"bbb\",\"labels\":[],\"maker\":\"ccc\"}],\"posId\":null,\"excludedDeliverySKUs\":null,\"excludedLevelSKUs\":null,\"excludedLevelCategories\":null,\"revisedDocument\":null,\"labels\":[]}}","@version":"1","id":1,"type":"OpenLoyalty.Component.Transaction.Domain.Event.TransactionWasRegistered","uuid":"91d83147-0280-481e-a984-c899e9720ec2","recorded_on":"2020-02-24T09:25:06.222986+00:00"}
{"playhead":1,"metadata":"{\"class\":\"Broadway\\Domain\\Metadata\",\"payload\":{\"console\":{\"command\":\"Doctrine\\Bundle\\FixturesBundle\\Command\\LoadDataFixturesDoctrineCommand\",\"arguments\":\"'doctrine:fixtures:load' --env=prod -n\"}}}","@timestamp":"2020-05-13T18:03:24.430Z","payload":"{\"class\":\"OpenLoyalty\\Component\\Customer\\Domain\\Event\\AssignedAccountToCustomer\",\"payload\":{\"customerId\":\"00000000-0000-474c-b092-b0dd880c07aa\",\"accountId\":\"3c067099-486a-41a8-87ca-343305126c5e\"}}","@version":"1","id":80,"type":"OpenLoyalty.Component.Customer.Domain.Event.AssignedAccountToCustomer","uuid":"00000000-0000-474c-b092-b0dd880c07aa","recorded_on":"2020-02-24T09:25:20.012550+00:00"}
{"playhead":2,"metadata":"{\"class\":\"Broadway\\Domain\\Metadata\",\"payload\":{\"console\":{\"command\":\"Doctrine\\Bundle\\FixturesBundle\\Command\\LoadDataFixturesDoctrineCommand\",\"arguments\":\"'doctrine:fixtures:load' --env=prod -n\"}}}","@timestamp":"2020-05-13T18:03:24.430Z","payload":"{\"class\":\"OpenLoyalty\\Component\\Customer\\Domain\\Event\\CustomerWasMovedToLevel\",\"payload\":{\"customerId\":\"00000000-0000-474c-b092-b0dd880c07aa\",\"levelId\":\"e82c96cf-32a3-43bd-9034-4df343e50000\",\"oldLevelId\":null,\"updatedAt\":1582536320,\"manually\":false,\"removeLevelManually\":false}}","@version":"1","id":81,"type":"OpenLoyalty.Component.Customer.Domain.Event.CustomerWasMovedToLevel","uuid":"00000000-0000-474c-b092-b0dd880c07aa","recorded_on":"2020-02-24T09:25:20.014810+00:00"}
和许多其他具有不同 type
值的人。
我正在寻找的是将这些数据的 payload
插入到它们相应的 elasticsearch 索引中。例如 type
= OpenLoyalty.Component.Customer.Domain.Event.AssignedAccountToCustomer 的数据必须进入 oloy.account_details
索引。索引应该是这样的:
{
"_index": "oloy.account_details",
"_type": "OpenLoyalty\Component\Account\Domain\ReadModel\AccountDetails",
"_id": "AXGjvhDQUCC0J4ATWiCP",
"_version": 1,
"_score": 1,
"_source": {
"accountId": "5a9bdb83-f7e8-442c-b06f-387e1b1e95a7",
"customerId": "11111111-0000-474c-b092-b0dd880c07e1"
}
}
我想知道我应该为Logstash 设置什么配置。到目前为止我有这个:
input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://localhost:5432/openloyalty"
jdbc_user => "openloyalty"
jdbc_password => "openloyalty"
jdbc_driver_library => "C:\logstash\postgresql-42.2.12.jar"
jdbc_driver_class => "org.postgresql.Driver"
statement => "SELECT * from events"
}
}
output {
elasticsearch {
index => "???"
document_type => "???"
document_id => "???"
hosts => ["localhost:9200"]
}
}
您的答案在这里:
input {
file{
path => "C:/Users/Khatere Tajfar/Desktop/Log/application.log"
start_position => "beginning"
ignore_older => 0
}
}
filter {
grok {
match=>{"message"=>"%{GREEDYDATA:part1} INFO %{GREEDYDATA:part2}\{\"indexType\"\:\"%{GREEDYDATA:type}\"\,\"indexName\"\:\"%{GREEDYDATA:index}\"\,\"model\"\:\"%{GREEDYDATA:jsonpart}\"\,\"id\"\:\"%{GREEDYDATA:id}\"\}"}
}
mutate {
remove_field => [ "part1","part2","message","path","host","@timestamp","@version"]
gsub => ["jsonpart","[\\\]+","X"]
gsub => ["jsonpart","[\]","X"]
gsub => ["jsonpart","X",""]
}
json {
source => "jsonpart"
}
mutate {
remove_field => [ "jsonpart"]
}
mutate { add_field => { "[@metadata][type]" => "%{type}" } }
mutate { remove_field => ["type"] }
mutate { add_field => { "[@metadata][index]" => "%{index}" } }
mutate { remove_field => ["index"] }
mutate { add_field => { "[@metadata][id]" => "%{id}" } }
mutate { remove_field => ["id"] }
}
output {
stdout { codec => json_lines}
elasticsearch {
index => "%{[@metadata][index]}"
document_type => "%{[@metadata][type]}"
document_id => "%{[@metadata][id]}"
hosts => ["localhost:9200"]
}
}