spring-amqp 非线程安全侦听器的消费者利用率为零
spring-amqp zero consumer utilization with non thread-safe listener
我们在生产中遇到了一个问题,消费者的利用率为零,队列不断增长,性能下降。
每个消费者都是一个容器,其中包含非线程安全侦听器 bean 的单个实例。
每个侦听器都需要写入自己的一组文件。为了避免线程争用,我希望只有一个线程写入它自己的一组文件。
每个侦听器仅使用@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)实例化一次
我使用的配置与此 question
中的类似
每个容器还配置了重试建议,其代码如下:
public class RetryMessageAdvice extends StatelessRetryOperationsInterceptorFactoryBean {
private static final int DEFAULT_RETRY_COUNT = 5;
private static final int DEFAULT_BACKOFF_MS = 250;
private int retryCount;
private int backOffPeriodInMS;
public RetryMessageAdvice() {
this.retryCount = DEFAULT_RETRY_COUNT;
this.backOffPeriodInMS = DEFAULT_BACKOFF_MS;
initializeRetryPolicy();
}
public RetryMessageAdvice(int retryCount, int backoff) {
this.retryCount = retryCount;
this.backOffPeriodInMS = backoff;
initializeRetryPolicy();
}
public void initializeRetryPolicy() {
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(this.retryCount);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(backOffPeriodInMS);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setBackOffPolicy(backOffPolicy);
this.setRetryOperations(retryTemplate);
this.setMessageRecoverer(new RetryMessageRecoverer());
}
public int getRetryCount() {
return retryCount;
}
public void setRetryCount(int retryCount) {
this.retryCount = retryCount;
}
}
消费者看起来像这样:
@Component("exportListenerImpl")
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class ExportListenerImpl extends ExportListenerBase {
private static final Logger LOG = LoggerFactory.getLogger(ExportListenerImpl.class);
private final ExportMapper exportMapper;
private final ExportFormatter exportFormatter;
@Autowired
public ExportListenerImpl(@Qualifier("exportFormatter") ExportFormatter exportFormatter,
@Qualifier("exportMapper") ExportedMapper exportedMapper,
@Value("${export.root.dir}") String exportDirectory) {
super(exportDirectory);
this.exportedFormatter = exportFormatter;
this.exportedMapper = exportedMapper;
}
@Override
public void handle(AnalyticsEvent analyticsEvent) throws Exception {
ExportedEvent exportedEvent = exportMapper.mapPlace(analyticsEvent);
File csvFile = getCsvFile(exportedEvent);
String csvRow = exportFormatter.writeAsString(exportedEvent);
writeCsvRow(csvRow, csvFile);
}
}
其他注意事项
- 导出映射器和导出格式化器是线程安全的,但不使用@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
- 方法 writeCsvRow 同步。
- 大量错误导致 exportMapper 抛出异常并触发重试建议
- 来信速率为120/s
- 收货率与送货率的比率通常为5:1
我的错误理论是
- 大量错误导致大量重试和
降低性能。我最好不要把坏消息
在错误队列中。
- 不知何故,writeCsvRow 中的同步方法是
导致一些更高级别的线程出现问题
spring-amqp.
我的问题是,哪种理论是正确的?重试建议的影响是问题吗?
- 如果这些 bean 也不是线程安全的,那么它们也必须是原型作用域。
- 由于只有一个线程,因此不需要同步该方法,但应该不会有什么坏处。
- 如果错误无法恢复,您应该配置重试策略以不重试这些异常。
.
- 使用这些重试设置,您将在每次遇到错误时暂停容器线程 250 毫秒。所以,是的;这会影响性能。
- 应该不是很大的开销。
我们在生产中遇到了一个问题,消费者的利用率为零,队列不断增长,性能下降。
每个消费者都是一个容器,其中包含非线程安全侦听器 bean 的单个实例。
每个侦听器都需要写入自己的一组文件。为了避免线程争用,我希望只有一个线程写入它自己的一组文件。
每个侦听器仅使用@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)实例化一次
我使用的配置与此 question
中的类似每个容器还配置了重试建议,其代码如下:
public class RetryMessageAdvice extends StatelessRetryOperationsInterceptorFactoryBean {
private static final int DEFAULT_RETRY_COUNT = 5;
private static final int DEFAULT_BACKOFF_MS = 250;
private int retryCount;
private int backOffPeriodInMS;
public RetryMessageAdvice() {
this.retryCount = DEFAULT_RETRY_COUNT;
this.backOffPeriodInMS = DEFAULT_BACKOFF_MS;
initializeRetryPolicy();
}
public RetryMessageAdvice(int retryCount, int backoff) {
this.retryCount = retryCount;
this.backOffPeriodInMS = backoff;
initializeRetryPolicy();
}
public void initializeRetryPolicy() {
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(this.retryCount);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(backOffPeriodInMS);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setBackOffPolicy(backOffPolicy);
this.setRetryOperations(retryTemplate);
this.setMessageRecoverer(new RetryMessageRecoverer());
}
public int getRetryCount() {
return retryCount;
}
public void setRetryCount(int retryCount) {
this.retryCount = retryCount;
}
}
消费者看起来像这样:
@Component("exportListenerImpl")
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class ExportListenerImpl extends ExportListenerBase {
private static final Logger LOG = LoggerFactory.getLogger(ExportListenerImpl.class);
private final ExportMapper exportMapper;
private final ExportFormatter exportFormatter;
@Autowired
public ExportListenerImpl(@Qualifier("exportFormatter") ExportFormatter exportFormatter,
@Qualifier("exportMapper") ExportedMapper exportedMapper,
@Value("${export.root.dir}") String exportDirectory) {
super(exportDirectory);
this.exportedFormatter = exportFormatter;
this.exportedMapper = exportedMapper;
}
@Override
public void handle(AnalyticsEvent analyticsEvent) throws Exception {
ExportedEvent exportedEvent = exportMapper.mapPlace(analyticsEvent);
File csvFile = getCsvFile(exportedEvent);
String csvRow = exportFormatter.writeAsString(exportedEvent);
writeCsvRow(csvRow, csvFile);
}
}
其他注意事项
- 导出映射器和导出格式化器是线程安全的,但不使用@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
- 方法 writeCsvRow 同步。
- 大量错误导致 exportMapper 抛出异常并触发重试建议
- 来信速率为120/s
- 收货率与送货率的比率通常为5:1
我的错误理论是
- 大量错误导致大量重试和 降低性能。我最好不要把坏消息 在错误队列中。
- 不知何故,writeCsvRow 中的同步方法是 导致一些更高级别的线程出现问题 spring-amqp.
我的问题是,哪种理论是正确的?重试建议的影响是问题吗?
- 如果这些 bean 也不是线程安全的,那么它们也必须是原型作用域。
- 由于只有一个线程,因此不需要同步该方法,但应该不会有什么坏处。
- 如果错误无法恢复,您应该配置重试策略以不重试这些异常。
.
- 使用这些重试设置,您将在每次遇到错误时暂停容器线程 250 毫秒。所以,是的;这会影响性能。
- 应该不是很大的开销。