AWS Kinesis Firehose 无法将数据索引到 AWS Elasticsearch

AWS Kinesis Firehose unable to index data into AWS Elasticsearch

我正在尝试从 Amazon Kinesis Data Firehose to Amazon Elasticsearch Service, but it's logging an error saying 503 Service Unavailable. However, I can reach the Elasticsearch endpoint (https://vpc-XXX.<region>.es.amazonaws.com) and make queries on it. I also went through How can I prevent HTTP 503 Service Unavailable errors in Amazon Elasticsearch Service? 发送数据,可以确认我的设置有足够的资源。

这是我在保存失败日志的 S3 备份存储桶中遇到的错误:

{
    "attemptsMade": 8,
    "arrivalTimestamp": 1599748282943,
    "errorCode": "ES.ServiceException",
    "errorMessage": "Error received from Elasticsearch cluster. <html><body><h1>503 Service Unavailable</h1>\nNo server is available to handle this request.\n</body></html>",
    "attemptEndingTimestamp": 1599748643460,
    "rawData": "eyJ0aWNrZXJfc3ltYm9sIjoiQUxZIiwic2VjdG9yIjoiRU5FUkdZIiwiY2hhbmdlIjotNi4zNSwicHJpY2UiOjg4LjgzfQ==",
    "subsequenceNumber": 0,
    "esDocumentId": "49610662085822146490768158474738345331794592496281976834.0",
    "esIndexName": "prometheus-2020-09",
    "esTypeName": ""
},

有人知道如何解决这个问题并将数据索引到 Elasticsearch 中吗?

事实证明,我的问题是选择了错误的安全组。


我使用的是附加到 Elasticsearch 实例的相同安全组(我将其命名为 elasticsearch-${domain_name})(它允许来自 [=13= 的 TCP ingress/egress to/from 端口 443 ] 安全组)。我应该选择 firehose_es 安全组。

根据评论中的要求,这是 firehose_es SG 的 Terraform 配置。

resource "aws_security_group" "firehose_es" {
  name        = "firehose_es"
  description = "Firehose to send logs to Elasticsearch"
  vpc_id      = module.networking.aws_vpc_id
}

resource "aws_security_group_rule" "firehose_es_https_ingress" {
  type              = "ingress"
  from_port         = 443
  to_port           = 443
  protocol          = "tcp"
  security_group_id = aws_security_group.firehose_es.id
  cidr_blocks       = ["10.0.0.0/8"]
}

resource "aws_security_group_rule" "firehose_es_https_egress" {
  type                     = "egress"
  from_port                = 443
  to_port                  = 443
  protocol                 = "tcp"
  security_group_id        = aws_security_group.firehose_es.id
  source_security_group_id = aws_security_group.elasticsearch.id
}

我在问这个问题之前解决的另一件事(这可能是你们中的一些人遇到这个问题的原因)是使用正确的角色并将正确的策略附加到角色。这是我的角色(作为 Terraform 配置)

// https://docs.aws.amazon.com/firehose/latest/dev/controlling-access.html
data "aws_iam_policy_document" "firehose_es_policy_specific" {
  statement {
    actions = [
      "s3:AbortMultipartUpload",
      "s3:GetBucketLocation",
      "s3:GetObject",
      "s3:ListBucket",
      "s3:ListBucketMultipartUploads",
      "s3:PutObject"
    ]
    resources = [
      aws_s3_bucket.firehose.arn,
      "${aws_s3_bucket.firehose.arn}/*"
    ]
  }

  statement {
    actions = [
      "es:DescribeElasticsearchDomain",
      "es:DescribeElasticsearchDomains",
      "es:DescribeElasticsearchDomainConfig",
      "es:ESHttpPost",
      "es:ESHttpPut"
    ]

    resources = [
      var.elasticsearch_domain_arn,
      "${var.elasticsearch_domain_arn}/*",
    ]
  }

  statement {
    actions = [
      "es:ESHttpGet"
    ]

    resources = [
      "${var.elasticsearch_domain_arn}/_all/_settings",
      "${var.elasticsearch_domain_arn}/_cluster/stats",
      "${var.elasticsearch_domain_arn}/${var.name_prefix}${var.name}_${var.app}*/_mapping/type-name",
      "${var.elasticsearch_domain_arn}/_nodes",
      "${var.elasticsearch_domain_arn}/_nodes/stats",
      "${var.elasticsearch_domain_arn}/_nodes/*/stats",
      "${var.elasticsearch_domain_arn}/_stats",
      "${var.elasticsearch_domain_arn}/${var.name_prefix}${var.name}_${var.app}*/_stats"
    ]
  }

  statement {
    actions = [
      "ec2:DescribeVpcs",
      "ec2:DescribeVpcAttribute",
      "ec2:DescribeSubnets",
      "ec2:DescribeSecurityGroups",
      "ec2:DescribeNetworkInterfaces",
      "ec2:CreateNetworkInterface",
      "ec2:CreateNetworkInterfacePermission",
      "ec2:DeleteNetworkInterface",
    ]

    resources = [
      "*"
    ]
  }
}

resource "aws_kinesis_firehose_delivery_stream" "ecs" {
  name        = "${var.name_prefix}${var.name}_${var.app}"
  destination = "elasticsearch"

  s3_configuration {
    role_arn           = aws_iam_role.firehose_es.arn
    bucket_arn         = aws_s3_bucket.firehose.arn
    buffer_interval    = 60
    compression_format = "GZIP"
  }

  elasticsearch_configuration {
    domain_arn = var.elasticsearch_domain_arn
    role_arn   = aws_iam_role.firehose_es.arn

    # If Firehose cannot deliver to Elasticsearch, logs are sent to S3
    s3_backup_mode = "FailedDocumentsOnly"

    buffering_interval = 60
    buffering_size     = 5

    index_name            = "${var.name_prefix}${var.name}_${var.app}"
    index_rotation_period = "OneMonth"

    vpc_config {
      subnet_ids         = var.elasticsearch_subnet_ids
      security_group_ids = [var.firehose_security_group_id]
      role_arn           = aws_iam_role.firehose_es.arn
    }
  }
}

再次阅读 Controlling Access with Amazon Kinesis Data Firehose 文章后,我终于明白了自己的错误。