spring 启动和 spring 集成 aws 运动错误
spring boot and spring integration aws kinesis error
我已经设置 spring boot,spring-integration-aws with kinesis,并且在应用程序启动时不断出现以下错误。
不知道我的配置对不对?
我的 maven pom.xml 正确吗?
如果spring.io可以为spring-integration和aws-kinisis提供一个端到端的示例应用程序就好了。
感谢任何帮助消除此错误。不断重复。
提前致谢。
2018-03-21 19:38:20.763 INFO 8417 --- [is-dispatcher-1] a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception when describing stream [TestStream]. Backing off for [1000] millis.
com.amazonaws.services.kinesis.model.ResourceNotFoundException: Stream TestStream under account 000000000000 not found. (Service: AmazonKinesis; Status Code: 400; Error Code: ResourceNotFoundException; Request ID: 694ca3a0-2d3f-11e8-b393-0de7c3ab0fa7)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1630) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1302) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access0(AmazonHttpClient.java:667) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2388) ~[aws-java-sdk-kinesis-1.11.297.jar:na]
at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2364) ~[aws-java-sdk-kinesis-1.11.297.jar:na]
at com.amazonaws.services.kinesis.AmazonKinesisClient.executeDescribeStream(AmazonKinesisClient.java:754) ~[aws-java-sdk-kinesis-1.11.297.jar:na]
at com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:729) ~[aws-java-sdk-kinesis-1.11.297.jar:na]
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.lambda$populateShardsForStream[=12=](KinesisMessageDrivenChannelAdapter.java:467) ~[spring-integration-aws-2.0.0.BUILD-20180319.195200-12.jar:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_91]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_91]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_91]
@Configuration
@EnableIntegration
public class ReceiverConfig {
private final int port = 4567;
private KinesisMessageDrivenChannelAdapter kinesisMessageDrivenChannelAdapter(AmazonKinesis amazonKinesis) {
KinesisMessageDrivenChannelAdapter adapter =
new KinesisMessageDrivenChannelAdapter(amazonKinesis, "TestStream");
adapter.setOutputChannel(kinesisReceiveChannel());
adapter.setErrorMessageStrategy(new KinesisMessageHeaderErrorMessageStrategy());
adapter.setErrorChannel(errorChannel());
return adapter;
}
@Bean
public AmazonKinesis amazonKinesis() {
AmazonKinesis amazonKinesis;
String url = "http://localhost:" + this.port;
// See https://github.com/mhart/kinesalite#cbor-protocol-issues-with-the-java-sdk
System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
amazonKinesis = AmazonKinesisAsyncClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("", "")))
.withClientConfiguration(
new ClientConfiguration()
.withMaxErrorRetry(0)
.withConnectionTimeout(1000))
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(
url,
Regions.DEFAULT_REGION.getName()))
.build();
//System.clearProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY);
return amazonKinesis;
}
@Bean
public KinesisMessageDrivenChannelAdapter kinesisInboundChannelChannel1(AmazonKinesis amazonKinesis) {
return kinesisMessageDrivenChannelAdapter(amazonKinesis);
}
@Bean
public PollableChannel kinesisReceiveChannel() {
QueueChannel queueChannel = new QueueChannel();
queueChannel.setDatatypes(String.class);
return queueChannel;
}
@Bean
public PollableChannel errorChannel() {
QueueChannel queueChannel = new QueueChannel();
queueChannel.addInterceptor(new ChannelInterceptorAdapter() {
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
super.postSend(message, channel, sent);
if (message instanceof ErrorMessage) {
throw (RuntimeException) ((ErrorMessage) message).getPayload();
}
}
});
return queueChannel;
}
}
@Component
public class EventProcessor {
@ServiceActivator(inputChannel = "kinesisReceiveChannel", poller = {@Poller(fixedRate = "1000")} )
public void onEvent(String event) {
System.out.println("event: " + event);
}
}
@SpringBootApplication
public class KreceiverApplication implements CommandLineRunner {
private static Log logger = LogFactory.getLog(KreceiverApplication.class);
private static final String TEST_STREAM = "TestStream";
@Autowired
private AmazonKinesis amazonKinesis;
public static void main(String[] args) {
SpringApplication.run(KreceiverApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
try {
this.amazonKinesis.listStreams();
} catch (SdkClientException e) {
logger.warn("Tests not running because no Kinesis on ...", e);
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>kreceiver</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kreceiver</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-cloud-aws.version>1.2.2.RELEASE</spring-cloud-aws.version>
<spring-integration-aws-version>2.0.0.BUILD-SNAPSHOT</spring-integration-aws-version>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-aws</artifactId>
<version>${spring-integration-aws-version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-aws-dependencies</artifactId>
<version>${spring-cloud-aws.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
<version>1.11.297</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
根据您在 amazonKinesis()
中的评论,您似乎要针对 kinesalite
测试您的解决方案。并且在堆栈跟踪中出现错误,例如:
com.amazonaws.services.kinesis.model.ResourceNotFoundException: Stream TestStream under account 000000000000 not found.
这确实表明在该本地 kinesalite 资源中没有 TestStream
流。
不确定您想从 Spring IO 那里听到什么,但您所遇到的是使用 AWS Kinesis 本身的概念错误:流必须在 Kinesis 中存在才能开始使用它。这正是任何 AWS 资源的规则。它不会按需创建。
你当然可以自己做。这正是我们在集成测试中所做的:
KINESIS_LOCAL_RUNNING.getKinesis().createStream(TEST_STREAM, 1);
我建议你添加这样的代码:
this.amazonKinesis.createStream(TEST_STREAM);
进入你的run(String... args)
.
我已经设置 spring boot,spring-integration-aws with kinesis,并且在应用程序启动时不断出现以下错误。
不知道我的配置对不对?
我的 maven pom.xml 正确吗?
如果spring.io可以为spring-integration和aws-kinisis提供一个端到端的示例应用程序就好了。
感谢任何帮助消除此错误。不断重复。
提前致谢。
2018-03-21 19:38:20.763 INFO 8417 --- [is-dispatcher-1] a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception when describing stream [TestStream]. Backing off for [1000] millis.
com.amazonaws.services.kinesis.model.ResourceNotFoundException: Stream TestStream under account 000000000000 not found. (Service: AmazonKinesis; Status Code: 400; Error Code: ResourceNotFoundException; Request ID: 694ca3a0-2d3f-11e8-b393-0de7c3ab0fa7)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1630) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1302) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access0(AmazonHttpClient.java:667) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2388) ~[aws-java-sdk-kinesis-1.11.297.jar:na]
at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2364) ~[aws-java-sdk-kinesis-1.11.297.jar:na]
at com.amazonaws.services.kinesis.AmazonKinesisClient.executeDescribeStream(AmazonKinesisClient.java:754) ~[aws-java-sdk-kinesis-1.11.297.jar:na]
at com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:729) ~[aws-java-sdk-kinesis-1.11.297.jar:na]
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.lambda$populateShardsForStream[=12=](KinesisMessageDrivenChannelAdapter.java:467) ~[spring-integration-aws-2.0.0.BUILD-20180319.195200-12.jar:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_91]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_91]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_91]
@Configuration
@EnableIntegration
public class ReceiverConfig {
private final int port = 4567;
private KinesisMessageDrivenChannelAdapter kinesisMessageDrivenChannelAdapter(AmazonKinesis amazonKinesis) {
KinesisMessageDrivenChannelAdapter adapter =
new KinesisMessageDrivenChannelAdapter(amazonKinesis, "TestStream");
adapter.setOutputChannel(kinesisReceiveChannel());
adapter.setErrorMessageStrategy(new KinesisMessageHeaderErrorMessageStrategy());
adapter.setErrorChannel(errorChannel());
return adapter;
}
@Bean
public AmazonKinesis amazonKinesis() {
AmazonKinesis amazonKinesis;
String url = "http://localhost:" + this.port;
// See https://github.com/mhart/kinesalite#cbor-protocol-issues-with-the-java-sdk
System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
amazonKinesis = AmazonKinesisAsyncClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("", "")))
.withClientConfiguration(
new ClientConfiguration()
.withMaxErrorRetry(0)
.withConnectionTimeout(1000))
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(
url,
Regions.DEFAULT_REGION.getName()))
.build();
//System.clearProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY);
return amazonKinesis;
}
@Bean
public KinesisMessageDrivenChannelAdapter kinesisInboundChannelChannel1(AmazonKinesis amazonKinesis) {
return kinesisMessageDrivenChannelAdapter(amazonKinesis);
}
@Bean
public PollableChannel kinesisReceiveChannel() {
QueueChannel queueChannel = new QueueChannel();
queueChannel.setDatatypes(String.class);
return queueChannel;
}
@Bean
public PollableChannel errorChannel() {
QueueChannel queueChannel = new QueueChannel();
queueChannel.addInterceptor(new ChannelInterceptorAdapter() {
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
super.postSend(message, channel, sent);
if (message instanceof ErrorMessage) {
throw (RuntimeException) ((ErrorMessage) message).getPayload();
}
}
});
return queueChannel;
}
}
@Component
public class EventProcessor {
@ServiceActivator(inputChannel = "kinesisReceiveChannel", poller = {@Poller(fixedRate = "1000")} )
public void onEvent(String event) {
System.out.println("event: " + event);
}
}
@SpringBootApplication
public class KreceiverApplication implements CommandLineRunner {
private static Log logger = LogFactory.getLog(KreceiverApplication.class);
private static final String TEST_STREAM = "TestStream";
@Autowired
private AmazonKinesis amazonKinesis;
public static void main(String[] args) {
SpringApplication.run(KreceiverApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
try {
this.amazonKinesis.listStreams();
} catch (SdkClientException e) {
logger.warn("Tests not running because no Kinesis on ...", e);
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>kreceiver</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kreceiver</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-cloud-aws.version>1.2.2.RELEASE</spring-cloud-aws.version>
<spring-integration-aws-version>2.0.0.BUILD-SNAPSHOT</spring-integration-aws-version>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-aws</artifactId>
<version>${spring-integration-aws-version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-aws-dependencies</artifactId>
<version>${spring-cloud-aws.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
<version>1.11.297</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
根据您在 amazonKinesis()
中的评论,您似乎要针对 kinesalite
测试您的解决方案。并且在堆栈跟踪中出现错误,例如:
com.amazonaws.services.kinesis.model.ResourceNotFoundException: Stream TestStream under account 000000000000 not found.
这确实表明在该本地 kinesalite 资源中没有 TestStream
流。
不确定您想从 Spring IO 那里听到什么,但您所遇到的是使用 AWS Kinesis 本身的概念错误:流必须在 Kinesis 中存在才能开始使用它。这正是任何 AWS 资源的规则。它不会按需创建。
你当然可以自己做。这正是我们在集成测试中所做的:
KINESIS_LOCAL_RUNNING.getKinesis().createStream(TEST_STREAM, 1);
我建议你添加这样的代码:
this.amazonKinesis.createStream(TEST_STREAM);
进入你的run(String... args)
.