使用 logstash 将文档嵌套到 elasticsearch
Nested document to elasticsearch using logstash
大家好我正在尝试使用 logstash 将文档从 MSSQL 服务器索引到 elasticsearch。我希望我的文档作为嵌套文档摄取,但我收到聚合异常错误
这里是我所有的代码
Create table department(
ID Int identity(1,1) not null,
Name varchar(100)
)
Insert into department(Name)
Select 'IT Application development'
union all
Select 'HR & Marketing'
Create table Employee(
ID Int identity(1,1) not null,
emp_Name varchar(100),
dept_Id int
)
Insert into Employee(emp_Name,dept_Id)
Select 'Mohan',1
union all
Select 'parthi',1
union all
Select 'vignesh',1
Insert into Employee(emp_Name,dept_Id)
Select 'Suresh',2
union all
Select 'Jithesh',2
union all
Select 'Venkat',2
最终select声明
SELECT
De.id AS id,De.name AS deptname,Emp.id AS empid,Emp.emp_name AS empname
FROM department De LEFT JOIN employee Emp ON De.id = Emp.dept_Id
ORDER BY De.id
结果应该是这样
我的弹性搜索映射
PUT /departments
{
"mappings": {
"properties": {
"id":{
"type":"integer"
},
"deptname":{
"type":"text"
},
"employee_details":{
"type": "nested",
"properties": {
"empid":{
"type":"integer"
},
"empname":{
"type":"text"
}
}
}
}
}
}
我的 logstash 配置文件
input {
jdbc {
jdbc_driver_library => ""
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://EC2AMAZ-J90JR4A\SQLEXPRESS:1433;databaseName=xxxx;"
jdbc_user => "xxxx"
jdbc_password => "xxxx"
statement => "SELECT
De.id AS id,De.name AS deptname,Emp.id AS empid,Emp.emp_name AS empname
FROM department De LEFT JOIN employee Emp ON De.id = Emp.dept_Id
ORDER BY De.id"
}
}
filter{
aggregate {
task_id => "%{id}"
code => "
map['id'] = event['id']
map['deptname'] = event['deptname']
map['employee_details'] ||= []
map['employee_details'] << {'empId' => event['empid'], 'empname' => event['empname'] }
"
push_previous_map_as_event => true
timeout => 5
timeout_tags => ['aggregated']
}
}
output{
stdout{ codec => rubydebug }
elasticsearch{
hosts => "https://d9bc7cbca5ec49ea96a6ea683f70caca.eastus2.azure.elastic-cloud.com:4567"
user => "elastic"
password => "****"
index => "departments"
action => "index"
document_type => "departments"
document_id => "%{id}"
}
}
而 运行 logstash 我遇到的错误
Elasticsearch截图供参考
我的 elasticsearch 输出应该是这样的
{
"took" : 398,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "departments",
"_type" : "_doc",
"_id" : "2",
"_score" : 1.0,
"_source" : {
"id" : 1,
"deptname" : "IT Application development"
"employee_details" : [
{
"empid" : 1,
"empname" : "Mohan"
},
{
"empid" : 2,
"empname" : "Parthi"
},
{
"empid" : 3,
"empname" : "Vignesh"
}
]
}
}
]
}
}
有人可以帮我解决这个问题吗?我希望所有员工的 empname 和 empid 应该作为各自部门的嵌套文档插入。提前致谢
我使用 JDBC_STREAMING 而不是聚合过滤器,它工作正常可能对查看此 post 的人有所帮助。
input {
jdbc {
jdbc_driver_library => "D:/Users/xxxx/Desktop/driver/mssql-jdbc-7.4.1.jre12-shaded.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://EC2AMAZ-J90JR4A\SQLEXPRESS:1433;databaseName=xxx;"
jdbc_user => "xxx"
jdbc_password => "xxxx"
statement => "Select Policyholdername,Age,Policynumber,Dob,Client_Address,is_active from policy"
}
}
filter{
jdbc_streaming {
jdbc_driver_library => "D:/Users/xxxx/Desktop/driver/mssql-jdbc-7.4.1.jre12-shaded.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://EC2AMAZ-J90JR4A\SQLEXPRESS:1433;databaseName=xxxx;"
jdbc_user => "xxxx"
jdbc_password => "xxxx"
statement => "select claimnumber,claimtype,is_active from claim where policynumber = :policynumber"
parameters => {"policynumber" => "policynumber"}
target => "claim_details"
}
}
output {
elasticsearch {
hosts => "https://e5a4a4a4de7940d9b12674d62eac9762.eastus2.azure.elastic-cloud.com:9243"
user => "elastic"
password => "xxxx"
index => "xxxx"
action => "index"
document_type => "_doc"
document_id => "%{policynumber}"
}
stdout { codec => rubydebug }
}
您也可以尝试在 logstash 过滤器插件中使用聚合。检查这个
Inserting Nested Objects using Logstash
https://xyzcoder.github.io/2020/07/29/indexing-documents-using-logstash-and-python.html
我只展示了一个对象,但我们也可以有多个项目数组
input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/javalib/mssql-jdbc-8.2.2.jre11.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://host.docker.internal;database=Whosebug2010;user=pavan;password=pavankumar@123"
jdbc_user => "pavan"
jdbc_password => "pavankumar@123"
statement => "select top 500 p.Id as PostId,p.AcceptedAnswerId,p.AnswerCount,p.Body,u.Id as userid,u.DisplayName,u.Location
from Whosebug2010.dbo.Posts p inner join Whosebug2010.dbo.Users u
on p.OwnerUserId=u.Id"
}
}
filter {
aggregate {
task_id => "%{postid}"
code => "
map['postid'] = event.get('postid')
map['accepted_answer_id'] = event.get('acceptedanswerid')
map['answer_count'] = event.get('answercount')
map['body'] = event.get('body')
map['user'] = {
'id' => event.get('userid'),
'displayname' => event.get('displayname'),
'location' => event.get('location')
}
event.cancel()"
push_previous_map_as_event => true
timeout => 30
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch:9200", "http://elasticsearch:9200"]
index => "Whosebug_top"
}
stdout {
codec => rubydebug
}
}
所以在那个例子中,我有多种插入数据的方法,比如聚合、JDBC 流式传输和其他场景
大家好我正在尝试使用 logstash 将文档从 MSSQL 服务器索引到 elasticsearch。我希望我的文档作为嵌套文档摄取,但我收到聚合异常错误
这里是我所有的代码
Create table department(
ID Int identity(1,1) not null,
Name varchar(100)
)
Insert into department(Name)
Select 'IT Application development'
union all
Select 'HR & Marketing'
Create table Employee(
ID Int identity(1,1) not null,
emp_Name varchar(100),
dept_Id int
)
Insert into Employee(emp_Name,dept_Id)
Select 'Mohan',1
union all
Select 'parthi',1
union all
Select 'vignesh',1
Insert into Employee(emp_Name,dept_Id)
Select 'Suresh',2
union all
Select 'Jithesh',2
union all
Select 'Venkat',2
最终select声明
SELECT
De.id AS id,De.name AS deptname,Emp.id AS empid,Emp.emp_name AS empname
FROM department De LEFT JOIN employee Emp ON De.id = Emp.dept_Id
ORDER BY De.id
结果应该是这样
我的弹性搜索映射
PUT /departments
{
"mappings": {
"properties": {
"id":{
"type":"integer"
},
"deptname":{
"type":"text"
},
"employee_details":{
"type": "nested",
"properties": {
"empid":{
"type":"integer"
},
"empname":{
"type":"text"
}
}
}
}
}
}
我的 logstash 配置文件
input {
jdbc {
jdbc_driver_library => ""
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://EC2AMAZ-J90JR4A\SQLEXPRESS:1433;databaseName=xxxx;"
jdbc_user => "xxxx"
jdbc_password => "xxxx"
statement => "SELECT
De.id AS id,De.name AS deptname,Emp.id AS empid,Emp.emp_name AS empname
FROM department De LEFT JOIN employee Emp ON De.id = Emp.dept_Id
ORDER BY De.id"
}
}
filter{
aggregate {
task_id => "%{id}"
code => "
map['id'] = event['id']
map['deptname'] = event['deptname']
map['employee_details'] ||= []
map['employee_details'] << {'empId' => event['empid'], 'empname' => event['empname'] }
"
push_previous_map_as_event => true
timeout => 5
timeout_tags => ['aggregated']
}
}
output{
stdout{ codec => rubydebug }
elasticsearch{
hosts => "https://d9bc7cbca5ec49ea96a6ea683f70caca.eastus2.azure.elastic-cloud.com:4567"
user => "elastic"
password => "****"
index => "departments"
action => "index"
document_type => "departments"
document_id => "%{id}"
}
}
而 运行 logstash 我遇到的错误
Elasticsearch截图供参考
我的 elasticsearch 输出应该是这样的
{
"took" : 398,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "departments",
"_type" : "_doc",
"_id" : "2",
"_score" : 1.0,
"_source" : {
"id" : 1,
"deptname" : "IT Application development"
"employee_details" : [
{
"empid" : 1,
"empname" : "Mohan"
},
{
"empid" : 2,
"empname" : "Parthi"
},
{
"empid" : 3,
"empname" : "Vignesh"
}
]
}
}
]
}
}
有人可以帮我解决这个问题吗?我希望所有员工的 empname 和 empid 应该作为各自部门的嵌套文档插入。提前致谢
我使用 JDBC_STREAMING 而不是聚合过滤器,它工作正常可能对查看此 post 的人有所帮助。
input {
jdbc {
jdbc_driver_library => "D:/Users/xxxx/Desktop/driver/mssql-jdbc-7.4.1.jre12-shaded.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://EC2AMAZ-J90JR4A\SQLEXPRESS:1433;databaseName=xxx;"
jdbc_user => "xxx"
jdbc_password => "xxxx"
statement => "Select Policyholdername,Age,Policynumber,Dob,Client_Address,is_active from policy"
}
}
filter{
jdbc_streaming {
jdbc_driver_library => "D:/Users/xxxx/Desktop/driver/mssql-jdbc-7.4.1.jre12-shaded.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://EC2AMAZ-J90JR4A\SQLEXPRESS:1433;databaseName=xxxx;"
jdbc_user => "xxxx"
jdbc_password => "xxxx"
statement => "select claimnumber,claimtype,is_active from claim where policynumber = :policynumber"
parameters => {"policynumber" => "policynumber"}
target => "claim_details"
}
}
output {
elasticsearch {
hosts => "https://e5a4a4a4de7940d9b12674d62eac9762.eastus2.azure.elastic-cloud.com:9243"
user => "elastic"
password => "xxxx"
index => "xxxx"
action => "index"
document_type => "_doc"
document_id => "%{policynumber}"
}
stdout { codec => rubydebug }
}
您也可以尝试在 logstash 过滤器插件中使用聚合。检查这个 Inserting Nested Objects using Logstash
https://xyzcoder.github.io/2020/07/29/indexing-documents-using-logstash-and-python.html
我只展示了一个对象,但我们也可以有多个项目数组
input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/javalib/mssql-jdbc-8.2.2.jre11.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://host.docker.internal;database=Whosebug2010;user=pavan;password=pavankumar@123"
jdbc_user => "pavan"
jdbc_password => "pavankumar@123"
statement => "select top 500 p.Id as PostId,p.AcceptedAnswerId,p.AnswerCount,p.Body,u.Id as userid,u.DisplayName,u.Location
from Whosebug2010.dbo.Posts p inner join Whosebug2010.dbo.Users u
on p.OwnerUserId=u.Id"
}
}
filter {
aggregate {
task_id => "%{postid}"
code => "
map['postid'] = event.get('postid')
map['accepted_answer_id'] = event.get('acceptedanswerid')
map['answer_count'] = event.get('answercount')
map['body'] = event.get('body')
map['user'] = {
'id' => event.get('userid'),
'displayname' => event.get('displayname'),
'location' => event.get('location')
}
event.cancel()"
push_previous_map_as_event => true
timeout => 30
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch:9200", "http://elasticsearch:9200"]
index => "Whosebug_top"
}
stdout {
codec => rubydebug
}
}
所以在那个例子中,我有多种插入数据的方法,比如聚合、JDBC 流式传输和其他场景