在 logstash 中如何获取日期部分以附加在索引名称中

In logstash how to get the date part to append in index name

我正在尝试使用以下 logstash conf 文件将数据加载到 Elasticsearch 中。我必须创建一个索引名称并附加其中一列的日期部分 (dw_isnert_dt)。当我没有聚合这些数据来创建地图时,它工作正常,但聚合后它不能按预期工作。因此,假设 dw_insert_dt 值为“2021-04-27”,那么索引名称将为“test_index_2021_04_27”。请为我的生产部署提供帮助。

input {
    jdbc {
        jdbc_connection_string => "jdbc:oracle:thin:@host:1521/ORCL"
        jdbc_user => "username"
        jdbc_password => "password"
        jdbc_validate_connection => true
        jdbc_driver_library => "/home/logstash-6.2.4/jdbc_drivers/OJDBC-Full/ojdbc7.jar"
        jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
        schedule => "35 4 * * *"
        statement_filepath =>"/home/logstash-6.2.4/queries/dummy.sql"
        }
}

filter {
        ruby {
        code => "event.set('dw_insert_dt_str', event.get('dw_insert_dt').time.localtime.strftime('%Y_%m_%d'))"
    }

    aggregate {
    task_id => "%{pkey}"
    code => "
        map['pkey']                 = event.get('pkey')
        map['name']                 = event.get('name')
        map['role']                 = event.get('role')
        map['dw_insert_dt']         = event.get('dw_insert_dt')

        map['CUST_ADDRESS'] ||= []
        customer_address_list = {'address_id' => event.get('address_id'), 'addr1' => event.get('addr1'),'city' => event.get('city') , 'state' => event.get('state')}
        if (event.get('address_id') != nil )
            if ! map['CUST_ADDRESS'].include?(customer_address_list)
            map['CUST_ADDRESS'] << customer_address_list
            end
        end

       event.cancel()
       "
       push_previous_map_as_event => true
       timeout => 20
     }
   }

output {
    elasticsearch {
        index => "test_index_%{dw_insert_dt_str}"
        hosts => [ "host:port" ]
    }
}

请尽早提供帮助。

谢谢, 迪帕克

如果您使用 push_previous_map_as_event 选项,那么事件中唯一存在的字段就是您添加到地图的字段。在这种情况下,您将 [dw_insert_dt_str] 添加到原始事件,但您 event.cancel 。您需要将其添加到地图中。

您可以从活动中复制它

map['dw_insert_dt_str']         = event.get('dw_insert_dt_str')

或将 ruby 代码移至聚合过滤器

map['dw_insert_dt_str']         = event.get('dw_insert_dt').time.localtime.strftime('%Y_%m_%d')