有人可以提供 spring-integration-aws SQS 用法的示例吗?
Could somebody provide en example of spring-integration-aws SQS usage?
Here 是 spring-integration-aws
项目。他们提供了有关 Inbound Channle 适配器的示例:
@SpringBootApplication
public static class MyConfiguration {
@Autowired
private AmazonSQSAsync amazonSqs;
@Bean
public PollableChannel inputChannel() {
return new QueueChannel();
}
@Bean
public MessageProducer sqsMessageDrivenChannelAdapter() {
SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(this.amazonSqs, "myQueue");
adapter.setOutputChannel(inputChannel());
return adapter;
}
}
好了,Channel
和SqsMessageDrivenChannelAdapter
都定义好了,但是接下来呢?假设我有 spring 个这样的 bean:
import com.amazonaws.services.sqs.model.Message;
@Component
public class MyComponent {
public void onMessage(Message message) throws Exception {
//handle sqs message
}
}
- 如何
tell
spring 将来自 myQueue
的所有消息传递给此
组件?
- 是否有任何额外的配置来处理一条消息
一?例如在收到消息
SQS
后将它们标记为
处理并且它们对其他客户端不可见,所以它是
只需要获取一条消息,然后处理并获取下一条消息。做
默认启用此行为?
你应该阅读 Spring Integration Reference Manual。
@Component
public class MyComponent {
@ServiceActivator(inputChannel = "inputChannel")
public void onMessage(Message message) throws Exception {
//handle sqs message
}
}
回答你的第二个问题:
/**
* Configure the maximum number of messages that should be retrieved during one poll to the Amazon SQS system. This
* number must be a positive, non-zero number that has a maximum number of 10. Values higher then 10 are currently
* not supported by the queueing system.
*
* @param maxNumberOfMessages
* the maximum number of messages (between 1-10)
*/
public void setMaxNumberOfMessages(Integer maxNumberOfMessages) {
this.maxNumberOfMessages = maxNumberOfMessages;
}
默认为10
。
关于mark them as processing
的问题可以通过SqsMessageDeletionPolicy
选项实现:
/**
* Never deletes message automatically. The receiving listener method must acknowledge each message manually by using
* the acknowledgment parameter.
* <p><b>IMPORTANT</b>: When using this policy the listener method must take care of the deletion of the messages.
* If not, it will lead to an endless loop of messages (poison messages).</p>
*
* @see Acknowledgment
*/
NEVER,
这样的 Acknowledgment
对象被放置在 AwsHeaders.ACKNOWLEDGMENT
消息头中,您可以从 onMessage()
方法中获取并在需要时确认它。
Here 是 spring-integration-aws
项目。他们提供了有关 Inbound Channle 适配器的示例:
@SpringBootApplication
public static class MyConfiguration {
@Autowired
private AmazonSQSAsync amazonSqs;
@Bean
public PollableChannel inputChannel() {
return new QueueChannel();
}
@Bean
public MessageProducer sqsMessageDrivenChannelAdapter() {
SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(this.amazonSqs, "myQueue");
adapter.setOutputChannel(inputChannel());
return adapter;
}
}
好了,Channel
和SqsMessageDrivenChannelAdapter
都定义好了,但是接下来呢?假设我有 spring 个这样的 bean:
import com.amazonaws.services.sqs.model.Message;
@Component
public class MyComponent {
public void onMessage(Message message) throws Exception {
//handle sqs message
}
}
- 如何
tell
spring 将来自myQueue
的所有消息传递给此 组件? - 是否有任何额外的配置来处理一条消息
一?例如在收到消息
SQS
后将它们标记为 处理并且它们对其他客户端不可见,所以它是 只需要获取一条消息,然后处理并获取下一条消息。做 默认启用此行为?
你应该阅读 Spring Integration Reference Manual。
@Component
public class MyComponent {
@ServiceActivator(inputChannel = "inputChannel")
public void onMessage(Message message) throws Exception {
//handle sqs message
}
}
回答你的第二个问题:
/**
* Configure the maximum number of messages that should be retrieved during one poll to the Amazon SQS system. This
* number must be a positive, non-zero number that has a maximum number of 10. Values higher then 10 are currently
* not supported by the queueing system.
*
* @param maxNumberOfMessages
* the maximum number of messages (between 1-10)
*/
public void setMaxNumberOfMessages(Integer maxNumberOfMessages) {
this.maxNumberOfMessages = maxNumberOfMessages;
}
默认为10
。
关于mark them as processing
的问题可以通过SqsMessageDeletionPolicy
选项实现:
/**
* Never deletes message automatically. The receiving listener method must acknowledge each message manually by using
* the acknowledgment parameter.
* <p><b>IMPORTANT</b>: When using this policy the listener method must take care of the deletion of the messages.
* If not, it will lead to an endless loop of messages (poison messages).</p>
*
* @see Acknowledgment
*/
NEVER,
这样的 Acknowledgment
对象被放置在 AwsHeaders.ACKNOWLEDGMENT
消息头中,您可以从 onMessage()
方法中获取并在需要时确认它。