spring-amqp 非线程安全侦听器的消费者利用率为零

spring-amqp zero consumer utilization with non thread-safe listener

我们在生产中遇到了一个问题,消费者的利用率为零,队列不断增长,性能下降。

每个消费者都是一个容器,其中包含非线程安全侦听器 bean 的单个实例。

每个侦听器都需要写入自己的一组文件。为了避免线程争用,我希望只有一个线程写入它自己的一组文件。

每个侦听器仅使用@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)实例化一次

我使用的配置与此 question

中的类似

每个容器还配置了重试建议,其代码如下:

public class RetryMessageAdvice extends StatelessRetryOperationsInterceptorFactoryBean {
    private static final int DEFAULT_RETRY_COUNT = 5;
    private static final int DEFAULT_BACKOFF_MS = 250;
    private int retryCount;
    private int backOffPeriodInMS;

    public RetryMessageAdvice() {
        this.retryCount = DEFAULT_RETRY_COUNT;
        this.backOffPeriodInMS = DEFAULT_BACKOFF_MS;
        initializeRetryPolicy();
    }

    public RetryMessageAdvice(int retryCount, int backoff) {
        this.retryCount = retryCount;
        this.backOffPeriodInMS = backoff;
        initializeRetryPolicy();
    }

    public void initializeRetryPolicy() {

        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(this.retryCount);

        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(backOffPeriodInMS);

        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(retryPolicy);
        retryTemplate.setBackOffPolicy(backOffPolicy);

        this.setRetryOperations(retryTemplate);
        this.setMessageRecoverer(new RetryMessageRecoverer());
    }

    public int getRetryCount() {
        return retryCount;
    }

    public void setRetryCount(int retryCount) {
        this.retryCount = retryCount;
    }
}

消费者看起来像这样:

@Component("exportListenerImpl")
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class ExportListenerImpl extends ExportListenerBase {
    private static final Logger LOG = LoggerFactory.getLogger(ExportListenerImpl.class);

    private final ExportMapper exportMapper;
    private final ExportFormatter exportFormatter;

    @Autowired
    public ExportListenerImpl(@Qualifier("exportFormatter") ExportFormatter exportFormatter,
            @Qualifier("exportMapper") ExportedMapper exportedMapper,
            @Value("${export.root.dir}") String exportDirectory) {
        super(exportDirectory);
        this.exportedFormatter = exportFormatter;
        this.exportedMapper = exportedMapper;
    }

    @Override
    public void handle(AnalyticsEvent analyticsEvent) throws Exception {

        ExportedEvent exportedEvent = exportMapper.mapPlace(analyticsEvent);

        File csvFile = getCsvFile(exportedEvent);
        String csvRow = exportFormatter.writeAsString(exportedEvent);
        writeCsvRow(csvRow, csvFile);
    }
}

其他注意事项

  1. 导出映射器和导出格式化器是线程安全的,但不使用@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  2. 方法 writeCsvRow 同步
  3. 大量错误导致 exportMapper 抛出异常并触发重试建议
  4. 来信速率为120/s
  5. 收货率与送货率的比率通常为5:1

我的错误理论是

  1. 大量错误导致大量重试和 降低性能。我最好不要把坏消息 在错误队列中。
  2. 不知何故,writeCsvRow 中的同步方法是 导致一些更高级别的线程出现问题 spring-amqp.

我的问题是,哪种理论是正确的?重试建议的影响是问题吗?

  1. 如果这些 bean 也不是线程安全的,那么它们也必须是原型作用域。
  2. 由于只有一个线程,因此不需要同步该方法,但应该不会有什么坏处。
  3. 如果错误无法恢复,您应该配置重试策略以不重试这些异常。

.

  1. 使用这些重试设置,您将在每次遇到错误时暂停容器线程 250 毫秒。所以,是的;这会影响性能。
  2. 应该不是很大的开销。