在 logstash 中如何获取日期部分以附加在索引名称中
In logstash how to get the date part to append in index name
我正在尝试使用以下 logstash conf 文件将数据加载到 Elasticsearch 中。我必须创建一个索引名称并附加其中一列的日期部分 (dw_isnert_dt)。当我没有聚合这些数据来创建地图时,它工作正常,但聚合后它不能按预期工作。因此,假设 dw_insert_dt 值为“2021-04-27”,那么索引名称将为“test_index_2021_04_27”。请为我的生产部署提供帮助。
input {
jdbc {
jdbc_connection_string => "jdbc:oracle:thin:@host:1521/ORCL"
jdbc_user => "username"
jdbc_password => "password"
jdbc_validate_connection => true
jdbc_driver_library => "/home/logstash-6.2.4/jdbc_drivers/OJDBC-Full/ojdbc7.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
schedule => "35 4 * * *"
statement_filepath =>"/home/logstash-6.2.4/queries/dummy.sql"
}
}
filter {
ruby {
code => "event.set('dw_insert_dt_str', event.get('dw_insert_dt').time.localtime.strftime('%Y_%m_%d'))"
}
aggregate {
task_id => "%{pkey}"
code => "
map['pkey'] = event.get('pkey')
map['name'] = event.get('name')
map['role'] = event.get('role')
map['dw_insert_dt'] = event.get('dw_insert_dt')
map['CUST_ADDRESS'] ||= []
customer_address_list = {'address_id' => event.get('address_id'), 'addr1' => event.get('addr1'),'city' => event.get('city') , 'state' => event.get('state')}
if (event.get('address_id') != nil )
if ! map['CUST_ADDRESS'].include?(customer_address_list)
map['CUST_ADDRESS'] << customer_address_list
end
end
event.cancel()
"
push_previous_map_as_event => true
timeout => 20
}
}
output {
elasticsearch {
index => "test_index_%{dw_insert_dt_str}"
hosts => [ "host:port" ]
}
}
请尽早提供帮助。
谢谢,
迪帕克
如果您使用 push_previous_map_as_event 选项,那么事件中唯一存在的字段就是您添加到地图的字段。在这种情况下,您将 [dw_insert_dt_str] 添加到原始事件,但您 event.cancel 。您需要将其添加到地图中。
您可以从活动中复制它
map['dw_insert_dt_str'] = event.get('dw_insert_dt_str')
或将 ruby 代码移至聚合过滤器
map['dw_insert_dt_str'] = event.get('dw_insert_dt').time.localtime.strftime('%Y_%m_%d')
我正在尝试使用以下 logstash conf 文件将数据加载到 Elasticsearch 中。我必须创建一个索引名称并附加其中一列的日期部分 (dw_isnert_dt)。当我没有聚合这些数据来创建地图时,它工作正常,但聚合后它不能按预期工作。因此,假设 dw_insert_dt 值为“2021-04-27”,那么索引名称将为“test_index_2021_04_27”。请为我的生产部署提供帮助。
input {
jdbc {
jdbc_connection_string => "jdbc:oracle:thin:@host:1521/ORCL"
jdbc_user => "username"
jdbc_password => "password"
jdbc_validate_connection => true
jdbc_driver_library => "/home/logstash-6.2.4/jdbc_drivers/OJDBC-Full/ojdbc7.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
schedule => "35 4 * * *"
statement_filepath =>"/home/logstash-6.2.4/queries/dummy.sql"
}
}
filter {
ruby {
code => "event.set('dw_insert_dt_str', event.get('dw_insert_dt').time.localtime.strftime('%Y_%m_%d'))"
}
aggregate {
task_id => "%{pkey}"
code => "
map['pkey'] = event.get('pkey')
map['name'] = event.get('name')
map['role'] = event.get('role')
map['dw_insert_dt'] = event.get('dw_insert_dt')
map['CUST_ADDRESS'] ||= []
customer_address_list = {'address_id' => event.get('address_id'), 'addr1' => event.get('addr1'),'city' => event.get('city') , 'state' => event.get('state')}
if (event.get('address_id') != nil )
if ! map['CUST_ADDRESS'].include?(customer_address_list)
map['CUST_ADDRESS'] << customer_address_list
end
end
event.cancel()
"
push_previous_map_as_event => true
timeout => 20
}
}
output {
elasticsearch {
index => "test_index_%{dw_insert_dt_str}"
hosts => [ "host:port" ]
}
}
请尽早提供帮助。
谢谢, 迪帕克
如果您使用 push_previous_map_as_event 选项,那么事件中唯一存在的字段就是您添加到地图的字段。在这种情况下,您将 [dw_insert_dt_str] 添加到原始事件,但您 event.cancel 。您需要将其添加到地图中。
您可以从活动中复制它
map['dw_insert_dt_str'] = event.get('dw_insert_dt_str')
或将 ruby 代码移至聚合过滤器
map['dw_insert_dt_str'] = event.get('dw_insert_dt').time.localtime.strftime('%Y_%m_%d')