AWS Typescript CDK,尝试将运动流作为源添加到 firehose
AWS Typescript CDK, Trying to add kinesis stream as a source to firehose
我正在尝试执行此流程:
DynamoDB -> Kinesis 数据流 -> Kinesis firehose -> S3 -> Redshift
我在控制台中尝试了这个并且一切正常,但是当我尝试使用 was typescript CDK 编写代码时,我收到了这个错误
Resource handler returned message: "KinesisSourceStreamConfig is only applicable for KinesisStreamAsSource stream type. (Service: Firehose, Status Code: 400, Request ID: c3b3
9dc4-c6a7-3c31-9a00-68c863659e53, Extended Request ID: bgcK523kXRumygU0/xX5cjDlUZgF6LLwSYo7sqyfQ2GIjzW9kJGWY3ZFm9pHoZlPuLanXnLlSqOr3xhyfHIEW7GYONByP5Fe)" (RequestToken: 1c9c1
56b-c2a4-9600-28f5-52af5664e3bb, HandlerErrorCode: InvalidRequest)
这是我要实现的代码:
import * as cdk from '@aws-cdk/core';
import * as s3 from '@aws-cdk/aws-s3';
import * as dynamodb from '@aws-cdk/aws-dynamodb';
import * as kinesis from '@aws-cdk/aws-kinesis';
import * as firehose from '@aws-cdk/aws-kinesisfirehose';
import * as iam from "@aws-cdk/aws-iam";
import { CfnResource } from '@aws-cdk/core';
const DynamoDBArn = 'arn:aws:dynamodb:us-east-1:207627709836:table/blog-srsa-ddb-table';
export class DynamoStreamer extends cdk.Stack {
constructor(
scope: cdk.Construct,
id: string
) {
super(scope, id);
const bucket = new s3.Bucket(this, 'data-lake', {
publicReadAccess: false
});
const dataStream=new kinesis.Stream(this,"dataStream",{shardCount:1,streamName:"stevensu_cdk_stream"})
const myTable = new dynamodb.Table(this, 'dynamo-table', {
tableName: 'blog-srsa-ddb-table2',
partitionKey: { name: 'id', type: dynamodb.AttributeType.STRING },
sortKey: { name: 'name', type: dynamodb.AttributeType.STRING },
removalPolicy: cdk.RemovalPolicy.RETAIN,
kinesisStream: dataStream
});
const deliveryStreamRole = new iam.Role(this, `DeliveryStreamRole`, {
assumedBy: new iam.ServicePrincipal("firehose.amazonaws.com"),
});
const firehoseDeliveryStream = new firehose.CfnDeliveryStream(this, 'Delivery Stream', {
redshiftDestinationConfiguration: {
clusterJdbcurl: 'jdbc:redshift://redshift-cluster-1.cicrdvtr7ouk.us-east-1.redshift.amazonaws.com:5439/dev',
username: 'admin',
password: '2131636+aA',
roleArn: 'arn:aws:iam::207627709836:role/service-role/KinesisFirehoseServiceRole-KDS-RED-vTWqG-us-east-1-1630965194702',
copyCommand: {
dataTableName: 'users_test',
copyOptions: 'format as json \'auto\''
},
s3Configuration: {
bucketArn: bucket.bucketArn,
roleArn: deliveryStreamRole.roleArn
},
},
kinesisStreamSourceConfiguration: {
kinesisStreamArn: dataStream.streamArn,
roleArn: deliveryStreamRole.roleArn
},
});
}
}
错误发生在这部分代码中:
kinesisStreamSourceConfiguration: {
kinesisStreamArn: dataStream.streamArn,
roleArn: deliveryStreamRole.roleArn
},
不确定更好的方法,但错误来自于您需要在 CfnDeliveryStream 实例中包含 deliveryStreamType
键并将其设置为 kinesisStreamAsSource
.
我正在尝试执行此流程:
DynamoDB -> Kinesis 数据流 -> Kinesis firehose -> S3 -> Redshift
我在控制台中尝试了这个并且一切正常,但是当我尝试使用 was typescript CDK 编写代码时,我收到了这个错误
Resource handler returned message: "KinesisSourceStreamConfig is only applicable for KinesisStreamAsSource stream type. (Service: Firehose, Status Code: 400, Request ID: c3b3
9dc4-c6a7-3c31-9a00-68c863659e53, Extended Request ID: bgcK523kXRumygU0/xX5cjDlUZgF6LLwSYo7sqyfQ2GIjzW9kJGWY3ZFm9pHoZlPuLanXnLlSqOr3xhyfHIEW7GYONByP5Fe)" (RequestToken: 1c9c1
56b-c2a4-9600-28f5-52af5664e3bb, HandlerErrorCode: InvalidRequest)
这是我要实现的代码:
import * as cdk from '@aws-cdk/core';
import * as s3 from '@aws-cdk/aws-s3';
import * as dynamodb from '@aws-cdk/aws-dynamodb';
import * as kinesis from '@aws-cdk/aws-kinesis';
import * as firehose from '@aws-cdk/aws-kinesisfirehose';
import * as iam from "@aws-cdk/aws-iam";
import { CfnResource } from '@aws-cdk/core';
const DynamoDBArn = 'arn:aws:dynamodb:us-east-1:207627709836:table/blog-srsa-ddb-table';
export class DynamoStreamer extends cdk.Stack {
constructor(
scope: cdk.Construct,
id: string
) {
super(scope, id);
const bucket = new s3.Bucket(this, 'data-lake', {
publicReadAccess: false
});
const dataStream=new kinesis.Stream(this,"dataStream",{shardCount:1,streamName:"stevensu_cdk_stream"})
const myTable = new dynamodb.Table(this, 'dynamo-table', {
tableName: 'blog-srsa-ddb-table2',
partitionKey: { name: 'id', type: dynamodb.AttributeType.STRING },
sortKey: { name: 'name', type: dynamodb.AttributeType.STRING },
removalPolicy: cdk.RemovalPolicy.RETAIN,
kinesisStream: dataStream
});
const deliveryStreamRole = new iam.Role(this, `DeliveryStreamRole`, {
assumedBy: new iam.ServicePrincipal("firehose.amazonaws.com"),
});
const firehoseDeliveryStream = new firehose.CfnDeliveryStream(this, 'Delivery Stream', {
redshiftDestinationConfiguration: {
clusterJdbcurl: 'jdbc:redshift://redshift-cluster-1.cicrdvtr7ouk.us-east-1.redshift.amazonaws.com:5439/dev',
username: 'admin',
password: '2131636+aA',
roleArn: 'arn:aws:iam::207627709836:role/service-role/KinesisFirehoseServiceRole-KDS-RED-vTWqG-us-east-1-1630965194702',
copyCommand: {
dataTableName: 'users_test',
copyOptions: 'format as json \'auto\''
},
s3Configuration: {
bucketArn: bucket.bucketArn,
roleArn: deliveryStreamRole.roleArn
},
},
kinesisStreamSourceConfiguration: {
kinesisStreamArn: dataStream.streamArn,
roleArn: deliveryStreamRole.roleArn
},
});
}
}
错误发生在这部分代码中:
kinesisStreamSourceConfiguration: {
kinesisStreamArn: dataStream.streamArn,
roleArn: deliveryStreamRole.roleArn
},
不确定更好的方法,但错误来自于您需要在 CfnDeliveryStream 实例中包含 deliveryStreamType
键并将其设置为 kinesisStreamAsSource
.