如何使用 Micrometer 获取 Spring 集成队列中的消息数?

How do I get the number of messages in a Spring Integration queue with Micrometer?

Spring的Metrics and Management: MessageChannel Metric Features描述:

If it is a QueueChannel, you also see statistics for the receive operation as well as the count of messages that are currently buffered by this QueueChannel

但是:

These legacy metrics will be removed in a future release. See Micrometer Integration.

描述的地方:

The Counter Meters for receive operations on pollable message channels have the following names or tags: name: spring.integration.receive [...]

这听起来好像只计算收到了多少条消息。队列中的消息数好像没有,即使计算receive - send也没有(因为没有send)。

那么,使用 Spring Integration 和 Micrometer,甚至可以读取队列大小吗?怎么样?

我认为我们需要为此队列大小值注册一个 Gauge

/**
 * A gauge tracks a value that may go up or down. The value that is published for gauges is
 * an instantaneous sample of the gauge at publishing time.
 *
 * @author Jon Schneider
 */
public interface Gauge extends Meter {

这是一个相应的构建器:

/**
 * A convenience method for building a gauge from a supplying function, holding a strong
 * reference to this function.
 *
 * @param name The gauge's name.
 * @param f    A function that yields a double value for the gauge.
 * @return A new gauge builder.
 * @since 1.1.0
 */
@Incubating(since = "1.1.0")
static Builder<Supplier<Number>> builder(String name, Supplier<Number> f) {

欢迎提出 GH 问题来改进!

目前可能可以通过具有 QueueChannel.getQueueSize() 委托的外部 Gauge 实例实现。

直到Spring Integration 5.4发布,这个可以用:

import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.support.management.metrics.MetricsCaptor;
import org.springframework.integration.support.management.micrometer.MicrometerMetricsCaptor;

import java.util.Collection;

@Configuration
@RequiredArgsConstructor
public class IntegrationMetricConfig {

  private final ApplicationContext applicationContext;
  private final Collection<QueueChannel> queues;

  /**
   * Temporary solution until <a href="https://github.com/spring-projects/spring-integration/pull/3349/files">Add
   * gauges for queue channel size</a> is released. Remove this when updating to Spring Integration
   * 5.4.
   */
  @Autowired
  public void queueSizeGauges() {
    MetricsCaptor metricsCaptor = MicrometerMetricsCaptor.loadCaptor(this.applicationContext);
    queues.forEach(queue -> {
      metricsCaptor.gaugeBuilder("spring.integration.channel.queue.size", queue,
        obj -> queue.getQueueSize())
        .tag("name", queue.getComponentName() == null ? "unknown" : queue.getComponentName())
        .tag("type", "channel")
        .description("The size of the queue channel")
        .build();

      metricsCaptor.gaugeBuilder("spring.integration.channel.queue.remaining.capacity", this,
        obj -> queue.getQueueSize())
        .tag("name", queue.getComponentName() == null ? "unknown" : queue.getComponentName())
        .tag("type", "channel")
        .description("The remaining capacity of the queue channel")
        .build();
    });
  }
}