连接到 Amazon MSK 集群

Connect to Amazon MSK cluster

我正在尝试设置 Amazon MSK 集群并通过 lambda 函数连接到它。 lambda 函数将成为消息的生产者,而不是消费者。

我正在使用 serverless 框架来配置所有内容,并且在我的 serverless.yml 中添加了以下内容并且似乎工作正常。

    MSK:
      Type: AWS::MSK::Cluster
      Properties:
        ClusterName: kafkaOne
        KafkaVersion: 2.2.1
        NumberOfBrokerNodes: 3
        BrokerNodeGroupInfo:
          InstanceType: kafka.t3.small
          ClientSubnets:
            - Ref: PrivateSubnet1
            - Ref: PrivateSubnet2
            - Ref: PrivateSubnet3

但是当尝试连接到该集群以实际发送消息时,我不确定如何在此处获取连接字符串?我认为它应该是 ZookeeperConnectString? 我是 kafka/msk 的新手,所以我可能没有看到明显的东西。

非常感谢任何建议。干杯。

我不知道你用的是哪种代码库,所以我将添加我在 GO 中编写的代码。

本质上,您应该像连接到某个独立的 Kafka 实例一样连接到 MSK 集群。我们正在使用 brokers 进行“连接”,或者更准确地说是写入 MSK 集群。

我正在使用 segmentio/kafka-go 库。我向 MSK 集群发送事件的函数如下所示

// Add event
func addEvent(ctx context.Context, requestBody RequestBodyType) (bool, error) {

    // Prepare dialer
    dialer := &kafka.Dialer{
        Timeout:   2 * time.Second,
        DualStack: true,
    }

    brokers := []string{os.Getenv("KAFKA_BROKER_1"), os.Getenv("KAFKA_BROKER_2"), os.Getenv("KAFKA_BROKER_3"), os.Getenv("KAFKA_BROKER_4")}


    // Prepare writer config
    kafkaConfig := kafka.WriterConfig{
        Brokers:  brokers,
        Topic:    os.Getenv("KAFKA_TOPIC"),
        Balancer: &kafka.Hash{},
        Dialer:   dialer,
    }

    // Prepare writer
    w := kafka.NewWriter(kafkaConfig)


    // Convert struct to json string
    event, err := json.Marshal(requestBody)
    if err != nil {
        fmt.Println("Convert struct to json for writing to KAFKA failed")
        panic(err)
    }

    // Write message
    writeError := w.WriteMessages(ctx,
        kafka.Message{
            Key:   []byte(requestBody.Event),
            Value: []byte(event),
        },
    )
    if writeError != nil {
        fmt.Println("ERROR WRITING EVENT TO KAFKA")
        panic("could not write message " + err.Error())
    }

    return true, nil
}

我的serverless.yml

上层代码(addEvent)属于functions -> postEvent in serverless.yml... 如果你从kafka消费,那么你应该检查functions -> processEvent。消费事件相当简单,但是为向 Kafka 生产而设置一切却很疯狂。我们可能为此工作了一个半月,并且仍在弄清楚应该如何设置所有内容。遗憾的是,无服务器并不能为你做所有事情,所以你必须在 AWS 中手动“点击”,但我们与其他框架相比,无服务器仍然是目前最好的

provider:
  name: aws
  runtime: go1.x
  stage: dev
  profile: ${env:AWS_PROFILE}
  region: ${env:REGION}
  apiName: my-app-${sls:stage}
  lambdaHashingVersion: 20201221
  environment:
    ENV: ${env:ENV}
    KAFKA_TOPIC: ${env:KAFKA_TOPIC}
    KAFKA_BROKER_1: ${env:KAFKA_BROKER_1}
    KAFKA_BROKER_2: ${env:KAFKA_BROKER_2}
    KAFKA_BROKER_3: ${env:KAFKA_BROKER_3}
    KAFKA_BROKER_4: ${env:KAFKA_BROKER_4}
    KAFKA_ARN: ${env:KAFKA_ARN}
    ACCESS_CONTROL_ORIGINS: ${env:ACCESS_CONTROL_ORIGINS}
    ACCESS_CONTROL_HEADERS: ${env:ACCESS_CONTROL_HEADERS}
    ACCESS_CONTROL_METHODS: ${env:ACCESS_CONTROL_METHODS}
    BATCH_SIZE: ${env:BATCH_SIZE}
    SLACK_API_TOKEN: ${env:SLACK_API_TOKEN}
    SLACK_CHANNEL_ID: ${env:SLACK_CHANNEL_ID}
  httpApi:
    cors: true
  apiGateway:
    resourcePolicy:
      - Effect: Allow
        Action: '*'
        Resource: '*'
        Principal: '*'
  vpc:
    securityGroupIds:
      - sg-*********
    subnetIds:
      - subnet-******
      - subnet-*******

functions:
  postEvent:
    handler: bin/postEvent
    package:
      patterns:
        - bin/postEvent
    events:
      - http:
          path: event
          method: post
          cors:
            origin: ${env:ACCESS_CONTROL_ORIGINS}
            headers:
              - Content-Type
              - Content-Length
              - Accept-Encoding
              - Origin
              - Referer
              - Authorization
              - X-CSRF-Token
              - X-Amz-Date
              - X-Api-Key
              - X-Amz-Security-Token
              - X-Amz-User-Agent
            allowCredentials: false
            methods:
              - OPTIONS
              - POST
  processEvent:
    handler: bin/processEvent
    package:
      patterns:
        - bin/processEvent
    events:
      - msk:
          arn: ${env:KAFKA_ARN}
          topic: ${env:KAFKA_TOPIC}
          batchSize: ${env:BATCH_SIZE}
          startingPosition: LATEST
resources:
  Resources:
    GatewayResponseDefault4XX:
      Type: 'AWS::ApiGateway::GatewayResponse'
      Properties:
        ResponseParameters:
          gatewayresponse.header.Access-Control-Allow-Origin: "'*'"
          gatewayresponse.header.Access-Control-Allow-Headers: "'*'"
        ResponseType: DEFAULT_4XX
        RestApiId:
          Ref: 'ApiGatewayRestApi'
    myDefaultRole:
      Type: AWS::IAM::Role
      Properties:
        Path: /
        RoleName: my-app-dev-eu-serverless-lambdaRole-${sls:stage} # required if you want to use 'serverless deploy --function' later on
        AssumeRolePolicyDocument:
          Version: '2012-10-17'
          Statement:
            - Effect: Allow
              Principal:
                Service:
                  - lambda.amazonaws.com
              Action: sts:AssumeRole
        # note that these rights are needed if you want your function to be able to communicate with resources within your vpc
        ManagedPolicyArns:
          - arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole
          - arn:aws:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole

我必须警告你,我们花了很多时间来弄清楚如何正确设置 VPC 和其他网络/权限的东西。他度假回来后,我的拼贴画将写博客 post。 :) 我希望这对您有所帮助。祝你好运;)

更新

如果您正在使用 javascript,那么您将像这样连接到 Kafka

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'order-app',
  brokers: [
    'broker1:port',
    'broker2:port',
  ],
  ssl: true, // false
})

可以在我进行 API 调用时找到称为代理 bootstrap 字符串的连接字符串 aws kafka get-bootstrap-brokers --cluster-arn ClusterArn

参见此处示例:https://docs.aws.amazon.com/msk/latest/developerguide/msk-get-bootstrap-brokers.html

此外,这里还逐步介绍了如何 produce/consume 数据:https://docs.aws.amazon.com/msk/latest/developerguide/produce-consume.html