连接到 Amazon MSK 集群
Connect to Amazon MSK cluster
我正在尝试设置 Amazon MSK 集群并通过 lambda 函数连接到它。 lambda 函数将成为消息的生产者,而不是消费者。
我正在使用 serverless 框架来配置所有内容,并且在我的 serverless.yml 中添加了以下内容并且似乎工作正常。
MSK:
Type: AWS::MSK::Cluster
Properties:
ClusterName: kafkaOne
KafkaVersion: 2.2.1
NumberOfBrokerNodes: 3
BrokerNodeGroupInfo:
InstanceType: kafka.t3.small
ClientSubnets:
- Ref: PrivateSubnet1
- Ref: PrivateSubnet2
- Ref: PrivateSubnet3
但是当尝试连接到该集群以实际发送消息时,我不确定如何在此处获取连接字符串?我认为它应该是 ZookeeperConnectString?
我是 kafka/msk 的新手,所以我可能没有看到明显的东西。
非常感谢任何建议。干杯。
我不知道你用的是哪种代码库,所以我将添加我在 GO 中编写的代码。
本质上,您应该像连接到某个独立的 Kafka 实例一样连接到 MSK 集群。我们正在使用 brokers 进行“连接”,或者更准确地说是写入 MSK 集群。
我正在使用 segmentio/kafka-go 库。我向 MSK 集群发送事件的函数如下所示
// Add event
func addEvent(ctx context.Context, requestBody RequestBodyType) (bool, error) {
// Prepare dialer
dialer := &kafka.Dialer{
Timeout: 2 * time.Second,
DualStack: true,
}
brokers := []string{os.Getenv("KAFKA_BROKER_1"), os.Getenv("KAFKA_BROKER_2"), os.Getenv("KAFKA_BROKER_3"), os.Getenv("KAFKA_BROKER_4")}
// Prepare writer config
kafkaConfig := kafka.WriterConfig{
Brokers: brokers,
Topic: os.Getenv("KAFKA_TOPIC"),
Balancer: &kafka.Hash{},
Dialer: dialer,
}
// Prepare writer
w := kafka.NewWriter(kafkaConfig)
// Convert struct to json string
event, err := json.Marshal(requestBody)
if err != nil {
fmt.Println("Convert struct to json for writing to KAFKA failed")
panic(err)
}
// Write message
writeError := w.WriteMessages(ctx,
kafka.Message{
Key: []byte(requestBody.Event),
Value: []byte(event),
},
)
if writeError != nil {
fmt.Println("ERROR WRITING EVENT TO KAFKA")
panic("could not write message " + err.Error())
}
return true, nil
}
我的serverless.yml
上层代码(addEvent)属于functions -> postEvent in serverless.yml... 如果你从kafka消费,那么你应该检查functions -> processEvent。消费事件相当简单,但是为向 Kafka 生产而设置一切却很疯狂。我们可能为此工作了一个半月,并且仍在弄清楚应该如何设置所有内容。遗憾的是,无服务器并不能为你做所有事情,所以你必须在 AWS 中手动“点击”,但我们与其他框架相比,无服务器仍然是目前最好的
provider:
name: aws
runtime: go1.x
stage: dev
profile: ${env:AWS_PROFILE}
region: ${env:REGION}
apiName: my-app-${sls:stage}
lambdaHashingVersion: 20201221
environment:
ENV: ${env:ENV}
KAFKA_TOPIC: ${env:KAFKA_TOPIC}
KAFKA_BROKER_1: ${env:KAFKA_BROKER_1}
KAFKA_BROKER_2: ${env:KAFKA_BROKER_2}
KAFKA_BROKER_3: ${env:KAFKA_BROKER_3}
KAFKA_BROKER_4: ${env:KAFKA_BROKER_4}
KAFKA_ARN: ${env:KAFKA_ARN}
ACCESS_CONTROL_ORIGINS: ${env:ACCESS_CONTROL_ORIGINS}
ACCESS_CONTROL_HEADERS: ${env:ACCESS_CONTROL_HEADERS}
ACCESS_CONTROL_METHODS: ${env:ACCESS_CONTROL_METHODS}
BATCH_SIZE: ${env:BATCH_SIZE}
SLACK_API_TOKEN: ${env:SLACK_API_TOKEN}
SLACK_CHANNEL_ID: ${env:SLACK_CHANNEL_ID}
httpApi:
cors: true
apiGateway:
resourcePolicy:
- Effect: Allow
Action: '*'
Resource: '*'
Principal: '*'
vpc:
securityGroupIds:
- sg-*********
subnetIds:
- subnet-******
- subnet-*******
functions:
postEvent:
handler: bin/postEvent
package:
patterns:
- bin/postEvent
events:
- http:
path: event
method: post
cors:
origin: ${env:ACCESS_CONTROL_ORIGINS}
headers:
- Content-Type
- Content-Length
- Accept-Encoding
- Origin
- Referer
- Authorization
- X-CSRF-Token
- X-Amz-Date
- X-Api-Key
- X-Amz-Security-Token
- X-Amz-User-Agent
allowCredentials: false
methods:
- OPTIONS
- POST
processEvent:
handler: bin/processEvent
package:
patterns:
- bin/processEvent
events:
- msk:
arn: ${env:KAFKA_ARN}
topic: ${env:KAFKA_TOPIC}
batchSize: ${env:BATCH_SIZE}
startingPosition: LATEST
resources:
Resources:
GatewayResponseDefault4XX:
Type: 'AWS::ApiGateway::GatewayResponse'
Properties:
ResponseParameters:
gatewayresponse.header.Access-Control-Allow-Origin: "'*'"
gatewayresponse.header.Access-Control-Allow-Headers: "'*'"
ResponseType: DEFAULT_4XX
RestApiId:
Ref: 'ApiGatewayRestApi'
myDefaultRole:
Type: AWS::IAM::Role
Properties:
Path: /
RoleName: my-app-dev-eu-serverless-lambdaRole-${sls:stage} # required if you want to use 'serverless deploy --function' later on
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action: sts:AssumeRole
# note that these rights are needed if you want your function to be able to communicate with resources within your vpc
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole
- arn:aws:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole
我必须警告你,我们花了很多时间来弄清楚如何正确设置 VPC 和其他网络/权限的东西。他度假回来后,我的拼贴画将写博客 post。 :) 我希望这对您有所帮助。祝你好运;)
更新
如果您正在使用 javascript,那么您将像这样连接到 Kafka
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'order-app',
brokers: [
'broker1:port',
'broker2:port',
],
ssl: true, // false
})
可以在我进行 API 调用时找到称为代理 bootstrap 字符串的连接字符串 aws kafka get-bootstrap-brokers --cluster-arn ClusterArn
参见此处示例:https://docs.aws.amazon.com/msk/latest/developerguide/msk-get-bootstrap-brokers.html
此外,这里还逐步介绍了如何 produce/consume 数据:https://docs.aws.amazon.com/msk/latest/developerguide/produce-consume.html
我正在尝试设置 Amazon MSK 集群并通过 lambda 函数连接到它。 lambda 函数将成为消息的生产者,而不是消费者。
我正在使用 serverless 框架来配置所有内容,并且在我的 serverless.yml 中添加了以下内容并且似乎工作正常。
MSK:
Type: AWS::MSK::Cluster
Properties:
ClusterName: kafkaOne
KafkaVersion: 2.2.1
NumberOfBrokerNodes: 3
BrokerNodeGroupInfo:
InstanceType: kafka.t3.small
ClientSubnets:
- Ref: PrivateSubnet1
- Ref: PrivateSubnet2
- Ref: PrivateSubnet3
但是当尝试连接到该集群以实际发送消息时,我不确定如何在此处获取连接字符串?我认为它应该是 ZookeeperConnectString? 我是 kafka/msk 的新手,所以我可能没有看到明显的东西。
非常感谢任何建议。干杯。
我不知道你用的是哪种代码库,所以我将添加我在 GO 中编写的代码。
本质上,您应该像连接到某个独立的 Kafka 实例一样连接到 MSK 集群。我们正在使用 brokers 进行“连接”,或者更准确地说是写入 MSK 集群。
我正在使用 segmentio/kafka-go 库。我向 MSK 集群发送事件的函数如下所示
// Add event
func addEvent(ctx context.Context, requestBody RequestBodyType) (bool, error) {
// Prepare dialer
dialer := &kafka.Dialer{
Timeout: 2 * time.Second,
DualStack: true,
}
brokers := []string{os.Getenv("KAFKA_BROKER_1"), os.Getenv("KAFKA_BROKER_2"), os.Getenv("KAFKA_BROKER_3"), os.Getenv("KAFKA_BROKER_4")}
// Prepare writer config
kafkaConfig := kafka.WriterConfig{
Brokers: brokers,
Topic: os.Getenv("KAFKA_TOPIC"),
Balancer: &kafka.Hash{},
Dialer: dialer,
}
// Prepare writer
w := kafka.NewWriter(kafkaConfig)
// Convert struct to json string
event, err := json.Marshal(requestBody)
if err != nil {
fmt.Println("Convert struct to json for writing to KAFKA failed")
panic(err)
}
// Write message
writeError := w.WriteMessages(ctx,
kafka.Message{
Key: []byte(requestBody.Event),
Value: []byte(event),
},
)
if writeError != nil {
fmt.Println("ERROR WRITING EVENT TO KAFKA")
panic("could not write message " + err.Error())
}
return true, nil
}
我的serverless.yml
上层代码(addEvent)属于functions -> postEvent in serverless.yml... 如果你从kafka消费,那么你应该检查functions -> processEvent。消费事件相当简单,但是为向 Kafka 生产而设置一切却很疯狂。我们可能为此工作了一个半月,并且仍在弄清楚应该如何设置所有内容。遗憾的是,无服务器并不能为你做所有事情,所以你必须在 AWS 中手动“点击”,但我们与其他框架相比,无服务器仍然是目前最好的
provider:
name: aws
runtime: go1.x
stage: dev
profile: ${env:AWS_PROFILE}
region: ${env:REGION}
apiName: my-app-${sls:stage}
lambdaHashingVersion: 20201221
environment:
ENV: ${env:ENV}
KAFKA_TOPIC: ${env:KAFKA_TOPIC}
KAFKA_BROKER_1: ${env:KAFKA_BROKER_1}
KAFKA_BROKER_2: ${env:KAFKA_BROKER_2}
KAFKA_BROKER_3: ${env:KAFKA_BROKER_3}
KAFKA_BROKER_4: ${env:KAFKA_BROKER_4}
KAFKA_ARN: ${env:KAFKA_ARN}
ACCESS_CONTROL_ORIGINS: ${env:ACCESS_CONTROL_ORIGINS}
ACCESS_CONTROL_HEADERS: ${env:ACCESS_CONTROL_HEADERS}
ACCESS_CONTROL_METHODS: ${env:ACCESS_CONTROL_METHODS}
BATCH_SIZE: ${env:BATCH_SIZE}
SLACK_API_TOKEN: ${env:SLACK_API_TOKEN}
SLACK_CHANNEL_ID: ${env:SLACK_CHANNEL_ID}
httpApi:
cors: true
apiGateway:
resourcePolicy:
- Effect: Allow
Action: '*'
Resource: '*'
Principal: '*'
vpc:
securityGroupIds:
- sg-*********
subnetIds:
- subnet-******
- subnet-*******
functions:
postEvent:
handler: bin/postEvent
package:
patterns:
- bin/postEvent
events:
- http:
path: event
method: post
cors:
origin: ${env:ACCESS_CONTROL_ORIGINS}
headers:
- Content-Type
- Content-Length
- Accept-Encoding
- Origin
- Referer
- Authorization
- X-CSRF-Token
- X-Amz-Date
- X-Api-Key
- X-Amz-Security-Token
- X-Amz-User-Agent
allowCredentials: false
methods:
- OPTIONS
- POST
processEvent:
handler: bin/processEvent
package:
patterns:
- bin/processEvent
events:
- msk:
arn: ${env:KAFKA_ARN}
topic: ${env:KAFKA_TOPIC}
batchSize: ${env:BATCH_SIZE}
startingPosition: LATEST
resources:
Resources:
GatewayResponseDefault4XX:
Type: 'AWS::ApiGateway::GatewayResponse'
Properties:
ResponseParameters:
gatewayresponse.header.Access-Control-Allow-Origin: "'*'"
gatewayresponse.header.Access-Control-Allow-Headers: "'*'"
ResponseType: DEFAULT_4XX
RestApiId:
Ref: 'ApiGatewayRestApi'
myDefaultRole:
Type: AWS::IAM::Role
Properties:
Path: /
RoleName: my-app-dev-eu-serverless-lambdaRole-${sls:stage} # required if you want to use 'serverless deploy --function' later on
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action: sts:AssumeRole
# note that these rights are needed if you want your function to be able to communicate with resources within your vpc
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole
- arn:aws:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole
我必须警告你,我们花了很多时间来弄清楚如何正确设置 VPC 和其他网络/权限的东西。他度假回来后,我的拼贴画将写博客 post。 :) 我希望这对您有所帮助。祝你好运;)
更新
如果您正在使用 javascript,那么您将像这样连接到 Kafka
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'order-app',
brokers: [
'broker1:port',
'broker2:port',
],
ssl: true, // false
})
可以在我进行 API 调用时找到称为代理 bootstrap 字符串的连接字符串 aws kafka get-bootstrap-brokers --cluster-arn ClusterArn
参见此处示例:https://docs.aws.amazon.com/msk/latest/developerguide/msk-get-bootstrap-brokers.html
此外,这里还逐步介绍了如何 produce/consume 数据:https://docs.aws.amazon.com/msk/latest/developerguide/produce-consume.html