无法使用自定义端点配置从 S3 进行轮询
Can't poll from S3 with a custom endpoint configuration
我尝试使用 spring-integration-aws
从 S3 存储桶中轮询以触发 spring-batch 作业。我的 S3 存储桶没有托管在 Amazon 上,而是在本地 minio 服务器上,所以我得到了一个自定义配置:
@Bean
public AmazonS3 amazonS3(ConfigProperties configProperties) {
return AmazonS3ClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:9001","eu-west-0")) // Region matches with minio region
.withPathStyleAccessEnabled(configProperties.getS3().isPathStyle())
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(
configProperties.getS3().getAccessKey(), configProperties.getS3().getSecretKey()
))).build();
}
我是这样定义 IntegrationFlow 的:
@Bean
public IntegrationFlow s3InboundFlow() {
S3RemoteFileTemplate template = new S3RemoteFileTemplate(new S3SessionFactory(amazonS3));
S3StreamingMessageSource s3StreamingMessageSource = new S3StreamingMessageSource(template);
s3StreamingMessageSource.setRemoteDirectory(String.format("%s/OUT/", configProperties.getS3().getBucketDataPath()));
return IntegrationFlows.from(s3StreamingMessageSource, configurer -> configurer
.id("s3InboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(POLL, TimeUnit.SECONDS)))
.handle(jobLaunchingGateway(jobRepository)) // Launch a spring-batch job
.get();
}
问题是,当轮询发生时,我收到以下错误:
2020-03-30 19:05:21,008 ERROR [scheduling-1] org.springframework.integration.handler.LoggingHandler: org.springframework.messaging.MessagingException: nested exception is java.lang.IllegalStateException: S3 client with invalid S3 endpoint configured: localhost:9001
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:342)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null(AbstractPollingEndpoint.java:275)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute[=12=](ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller(AbstractPollingEndpoint.java:272)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalStateException: S3 client with invalid S3 endpoint configured: localhost:9001
at com.amazonaws.services.s3.AmazonS3Client.getRegion(AmazonS3Client.java:4270)
at org.springframework.integration.aws.support.S3Session.getHostPort(S3Session.java:228)
at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.doReceive(AbstractRemoteFileStreamingMessageSource.java:214)
at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:167)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:250)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:359)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:328)
发生这种情况是因为在接收文件时,spring-integration-aws
中设置了一些 header :
AbstractRemoteFileStreamingMessageSource.java
return getMessageBuilderFactory()
.withPayload(session.readRaw(remotePath))
.setHeader(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE, session)
.setHeader(FileHeaders.REMOTE_DIRECTORY, file.getRemoteDirectory())
.setHeader(FileHeaders.REMOTE_FILE, file.getFilename())
.setHeader(FileHeaders.REMOTE_HOST_PORT, session.getHostPort())
.setHeader(FileHeaders.REMOTE_FILE_INFO,
this.fileInfoJson ? file.toJson() : file);
REMOTE_HOST_PORT
header 设置感谢 getHostPort() 方法。 S3Session.java中的getHostPort()调用getRegion()方法
AmazonS3Client
中的 getRegion() 方法不检查用户是否在 signing region
字段中设置了值。它只检查主机是否匹配 "amazonaws.com" 模式。
@Override
public String getHostPort() {
Region region = this.amazonS3.getRegion().toAWSRegion();
return String.format("%s.%s.%s:%d", AmazonS3.ENDPOINT_PREFIX, region.getName(), region.getDomain(), 443);
}
@Override
public synchronized Region getRegion() {
String authority = super.endpoint.getAuthority();
if (Constants.S3_HOSTNAME.equals(authority)) {
return Region.US_Standard;
} else {
Matcher m = Region.S3_REGIONAL_ENDPOINT_PATTERN.matcher(authority);
if (m.matches()) {
return Region.fromValue(m.group(1));
} else {
throw new IllegalStateException(
"S3 client with invalid S3 endpoint configured: " + authority);
}
}
}
如何使用自定义端点配置从 S3 进行轮询?
为什么 getHostPort() 方法不检查 signing region 值?是否可以解决此问题?
是的,可以解决。
您只需将 S3SessionFactory
扩展到 return S3Session
的扩展,并为您的自定义端点覆盖 getHostPort()
方法。
public class MyS3SessionFactory extends S3SessionFactory {
private MyS3Session s3Session;
@Override
public S3Session getSession() {
return s3Session;
}
public MyS3SessionFactory(AmazonS3 amazonS3) {
super(amazonS3);
Assert.notNull(amazonS3, "'amazonS3' must not be null.");
this.s3Session = new MyS3Session(amazonS3);
}
}
public class MyS3Session extends S3Session {
public MyS3Session(AmazonS3 amazonS3) {
super(amazonS3);
}
@Override
public String getHostPort() {
return "";
}
}
让我们讨论一下您提出的问题的可能解决方案:https://github.com/spring-projects/spring-integration-aws/issues/160 !
我尝试使用 spring-integration-aws
从 S3 存储桶中轮询以触发 spring-batch 作业。我的 S3 存储桶没有托管在 Amazon 上,而是在本地 minio 服务器上,所以我得到了一个自定义配置:
@Bean
public AmazonS3 amazonS3(ConfigProperties configProperties) {
return AmazonS3ClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:9001","eu-west-0")) // Region matches with minio region
.withPathStyleAccessEnabled(configProperties.getS3().isPathStyle())
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(
configProperties.getS3().getAccessKey(), configProperties.getS3().getSecretKey()
))).build();
}
我是这样定义 IntegrationFlow 的:
@Bean
public IntegrationFlow s3InboundFlow() {
S3RemoteFileTemplate template = new S3RemoteFileTemplate(new S3SessionFactory(amazonS3));
S3StreamingMessageSource s3StreamingMessageSource = new S3StreamingMessageSource(template);
s3StreamingMessageSource.setRemoteDirectory(String.format("%s/OUT/", configProperties.getS3().getBucketDataPath()));
return IntegrationFlows.from(s3StreamingMessageSource, configurer -> configurer
.id("s3InboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(POLL, TimeUnit.SECONDS)))
.handle(jobLaunchingGateway(jobRepository)) // Launch a spring-batch job
.get();
}
问题是,当轮询发生时,我收到以下错误:
2020-03-30 19:05:21,008 ERROR [scheduling-1] org.springframework.integration.handler.LoggingHandler: org.springframework.messaging.MessagingException: nested exception is java.lang.IllegalStateException: S3 client with invalid S3 endpoint configured: localhost:9001
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:342)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null(AbstractPollingEndpoint.java:275)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute[=12=](ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller(AbstractPollingEndpoint.java:272)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalStateException: S3 client with invalid S3 endpoint configured: localhost:9001
at com.amazonaws.services.s3.AmazonS3Client.getRegion(AmazonS3Client.java:4270)
at org.springframework.integration.aws.support.S3Session.getHostPort(S3Session.java:228)
at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.doReceive(AbstractRemoteFileStreamingMessageSource.java:214)
at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:167)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:250)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:359)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:328)
发生这种情况是因为在接收文件时,spring-integration-aws
中设置了一些 header :
AbstractRemoteFileStreamingMessageSource.java
return getMessageBuilderFactory()
.withPayload(session.readRaw(remotePath))
.setHeader(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE, session)
.setHeader(FileHeaders.REMOTE_DIRECTORY, file.getRemoteDirectory())
.setHeader(FileHeaders.REMOTE_FILE, file.getFilename())
.setHeader(FileHeaders.REMOTE_HOST_PORT, session.getHostPort())
.setHeader(FileHeaders.REMOTE_FILE_INFO,
this.fileInfoJson ? file.toJson() : file);
REMOTE_HOST_PORT
header 设置感谢 getHostPort() 方法。 S3Session.java中的getHostPort()调用getRegion()方法
AmazonS3Client
中的 getRegion() 方法不检查用户是否在 signing region
字段中设置了值。它只检查主机是否匹配 "amazonaws.com" 模式。
@Override
public String getHostPort() {
Region region = this.amazonS3.getRegion().toAWSRegion();
return String.format("%s.%s.%s:%d", AmazonS3.ENDPOINT_PREFIX, region.getName(), region.getDomain(), 443);
}
@Override
public synchronized Region getRegion() {
String authority = super.endpoint.getAuthority();
if (Constants.S3_HOSTNAME.equals(authority)) {
return Region.US_Standard;
} else {
Matcher m = Region.S3_REGIONAL_ENDPOINT_PATTERN.matcher(authority);
if (m.matches()) {
return Region.fromValue(m.group(1));
} else {
throw new IllegalStateException(
"S3 client with invalid S3 endpoint configured: " + authority);
}
}
}
如何使用自定义端点配置从 S3 进行轮询? 为什么 getHostPort() 方法不检查 signing region 值?是否可以解决此问题?
是的,可以解决。
您只需将 S3SessionFactory
扩展到 return S3Session
的扩展,并为您的自定义端点覆盖 getHostPort()
方法。
public class MyS3SessionFactory extends S3SessionFactory {
private MyS3Session s3Session;
@Override
public S3Session getSession() {
return s3Session;
}
public MyS3SessionFactory(AmazonS3 amazonS3) {
super(amazonS3);
Assert.notNull(amazonS3, "'amazonS3' must not be null.");
this.s3Session = new MyS3Session(amazonS3);
}
}
public class MyS3Session extends S3Session {
public MyS3Session(AmazonS3 amazonS3) {
super(amazonS3);
}
@Override
public String getHostPort() {
return "";
}
}
让我们讨论一下您提出的问题的可能解决方案:https://github.com/spring-projects/spring-integration-aws/issues/160 !