使用 fluentd mysql-replicator 插件将单个 Mysql 数据复制到 Elasticsearch

Replicating single Mysql data to Elasticsearch using fluentd mysql-replicator Plugin

我正在使用 fluentd mysql-replicator 插件将我的单个 mysql 数据推送到 elasticsearch,但是我的 fluentd 抛出以下错误。互联网上没有足够的文档,所以请帮助注册此错误。

我在 windows 本地机器上使用 td-agent v3 (fluentd 1.10.x) 版本,Elasticsearch 版本为 7.7.1。我是 运行 这个配置文件作为 C:\opt\td-agent>fluentd -c etc\td-agent\td-agent.conf from Td-agent Command Promt

2020-12-07 13:31:42 +0530 [info]: #0 starting fluentd worker pid=12144 ppid=16648 worker=0
2020-12-07 13:31:42 +0530 [info]: #0 fluentd worker is now running worker=0
2020-12-07 13:31:43 +0530 [info]: #0 mysql_replicator: finished execution :tag=>replicator.securedb.testfluentd.${event}.${primary_key} :rows_count=>2 :elapsed_time=>0.07 sec
2020-12-07 13:31:53 +0530 [info]: #0 mysql_replicator: finished execution :tag=>replicator.securedb.testfluentd.${event}.${primary_key} :rows_count=>2 :elapsed_time=>0.26 sec
2020-12-07 13:32:03 +0530 [info]: #0 mysql_replicator: finished execution :tag=>replicator.securedb.testfluentd.${event}.${primary_key} :rows_count=>2 :elapsed_time=>0.24 sec
2020-12-07 13:32:14 +0530 [info]: #0 mysql_replicator: finished execution :tag=>replicator.securedb.testfluentd.${event}.${primary_key} :rows_count=>2 :elapsed_time=>0.25 sec
2020-12-07 13:32:24 +0530 [info]: #0 mysql_replicator: finished execution :tag=>replicator.securedb.testfluentd.${event}.${primary_key} :rows_count=>2 :elapsed_time=>0.28 sec
2020-12-07 13:32:34 +0530 [info]: #0 mysql_replicator: finished execution :tag=>replicator.securedb.testfluentd.${event}.${primary_key} :rows_count=>2 :elapsed_time=>0.23 sec
2020-12-07 13:32:44 +0530 [info]: #0 mysql_replicator: finished execution :tag=>replicator.securedb.testfluentd.${event}.${primary_key} :rows_count=>2 :elapsed_time=>0.27 sec
**2020-12-07 13:32:48 +0530 [warn]: #0 failed to flush the buffer. retry_time=0 next_retry_seconds=2020-12-07 13:32:49 +0530 chunk="5b5db3ca23998ad6be37a80c95259443" error_class=IndexError error="undefined group name reference: type_name"**
  2020-12-07 13:32:48 +0530 [warn]: #0 C:/opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-mysql-replicator-1.0.1/lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb:65:in `[]'
  2020-12-07 13:32:48 +0530 [warn]: #0 C:/opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-mysql-replicator-1.0.1/lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb:65:in `block in write'
  2020-12-07 13:32:48 +0530 [warn]: #0 C:/opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.11.1/lib/fluent/event.rb:325:in `each'
  2020-12-07 13:32:48 +0530 [warn]: #0 C:/opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.11.1/lib/fluent/event.rb:325:in `block in each'
  2020-12-07 13:32:48 +0530 [warn]: #0 C:/opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.11.1/lib/fluent/plugin/buffer/memory_chunk.rb:81:in `open'
  2020-12-07 13:32:48 +0530 [warn]: #0 C:/opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.11.1/lib/fluent/plugin/buffer/memory_chunk.rb:81:in `open'
  2020-12-07 13:32:48 +0530 [warn]: #0 C:/opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.11.1/lib/fluent/event.rb:324:in `each'
  2020-12-07 13:32:48 +0530 [warn]: #0 C:/opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-mysql-replicator-1.0.1/lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb:62:in `write'
  2020-12-07 13:32:48 +0530 [warn]: #0 C:/opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.11.1/lib/fluent/plugin/output.rb:1133:in `try_flush'
  2020-12-07 13:32:48 +0530 [warn]: #0 C:/opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.11.1/lib/fluent/plugin/output.rb:1439:in `flush_thread_run'
  2020-12-07 13:32:48 +0530 [warn]: #0 C:/opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.11.1/lib/fluent/plugin/output.rb:461:in `block (2 levels) in start'
  2020-12-07 13:32:48 +0530 [warn]: #0 C:/opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.11.1/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2020-12-07 13:32:49 +0530 [warn]: #0 failed to flush the buffer. retry_time=1 next_retry_seconds=2020-12-07 13:32:50 +0530 chunk="5b5db3ca23998ad6be37a80c95259443" error_class=IndexError error="undefined group name reference: type_name"
  2020-12-07 13:32:49 +0530 [warn]: #0 suppressed same stacktrace

下面是我的 td 代理配置文件(fluentd 配置文件)。如果是,我还需要提及 elasticsearch iden 名称吗?

<source>
  @type mysql_replicator

  # Set connection settings for replicate source.
  host 172.16.xx.xx
  port xxxxx
  username root
  password xxxxx
  database sarvendra

  # Set replicate query configuration.
  query SELECT tutorial_id, tutorial_title from testfluentd;
  primary_key tutorial_id
  interval 10s

  # Enable detect deletion event not only insert/update events. (default: yes)
  # It is useful to use `enable_delete no` that keep following recently updated record with this query.
  # `SELECT * FROM search_test WHERE DATE_ADD(updated_at, INTERVAL 5 MINUTE) > NOW();`
  enable_delete no

  # Format output tag for each events. Placeholders usage as described below.
  tag replicator.ssarvendra.testfluentd.${event}.${primary_key}
  # ${event} : the variation of row event type by insert/update/delete.
  # ${primary_key} : the value of `replicator_manager.settings.primary_key` in manager table.
</source>

<match replicator.**>
  @type mysql_replicator_elasticsearch

  # Set Elasticsearch connection.
  host localhost
  port 9200
  # You can configure to use SSL for connecting to Elasticsearch.
  # ssl true

  # Basic authentication credentials can be configured
  # username basic_auth_username
  # password basic_auth_password

  # Set Elasticsearch index, type, and unique id (primary_key) from tag.
  tag_format (?<index_name>[^\.]+)\.(?<event>[^\.]+)\.(?<primary_key>[^\.]+)$

  # Set frequency of sending bulk request to Elasticsearch node.
  flush_interval 5s

  # Set maximum retry interval (required fluentd >= 0.10.41)
  #max_retry_wait 1800

  # Queued chunks are flushed at shutdown process.
  # It's sample for td-agent. If you use Yamabiko, replace path from 'td-agent' to 'yamabiko'.
  flush_at_shutdown yes
  buffer_type file
  buffer_path /var/log/td-agent/buffer/mysql_replicator_elasticsearch
</match>

您应该使用以下 tag_format:

tag_format (?<index_name>[^\.]+)\.(?<type_name>[^\.]+)\.(?<event>[^\.]+)\.(?<primary_key>[^\.]+)$

并在源配置中使用此 tag:

tag replicator.ssarvendra._doc.${event}.${primary_key}

原因是因为在 ES 7 中,不再有 no mapping type 并且 _doc 是唯一允许的默认类型。