Spring 批处理并启动千分尺

Spring Batch and boot Micrometer

我有一个 Spring 批处理作业,它从数据库中读取记录,处理它,然后写入另一个数据库。我想测量 Spring 批处理器和 Writer 的总时间/平均时间。根据文档 - https://docs.spring.io/spring-batch/docs/current/reference/html/monitoring-and-metrics.html 使用 spring.batch 前缀可以轻松获得指标。如何将处理器和编写器所花费的时间记录到控制台。当我在作业侦听器结束时从指标注册表打印指标结果时,我看到作业和步骤状态为已完成,但计时器统计指标显示为 0。是否需要执行任何操作才能启用计时器?我不打算将指标推送到 prometheus 或 atlas 或任何其他注册表。所以在pom.xml中只添加了micrometer核心依赖。请告知如何记录不同组件的时间指标。任何例子都会很有帮助。

I see the job and step status as Completed but the Timer stats metrics are showing as 0.

原因是默认情况下,micrometer 中的全局注册表是一个空组合。您需要至少添加一个注册表以保留指标。参见

I don't intend to push the metrics to prometheus or atlas or any other registries.

如果您想在不将指标推送到指标后端的情况下使用指标,您可以使用侦听器并直接从全局注册表获取访问权限。这是您要求的项目处理时间的快速示例:

import java.util.Arrays;
import java.util.List;

import io.micrometer.core.instrument.Measurement;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;

import org.springframework.batch.core.ItemProcessListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class MyJob {

    @Bean
    public ItemReader<Integer> itemReader() {
        return new ListItemReader<>(Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9));
    }

    @Bean
    public ItemProcessor<Integer, Integer> itemProcessor() {
        return item -> {
            System.out.println("processing item " + item);
            Thread.sleep(2000);
            return item + 1;
        };
    }

    @Bean
    public ItemWriter<Integer> itemWriter() {
        return items -> {
            for (Integer item : items) {
                System.out.println("writing item = " + item);
            }
        };
    }

    @Bean
    public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
        return jobs.get("job")
                .start(steps.get("step")
                        .<Integer, Integer>chunk(5)
                        .reader(itemReader())
                        .processor(itemProcessor())
                        .writer(itemWriter())
                        .listener(new MonitoringItemProcessListener())
                        .build())
                .build();
    }
    
    static class MonitoringItemProcessListener implements ItemProcessListener<Integer, Integer> {

        @Override
        public void beforeProcess(Integer item) {
            
        }

        @Override
        public void afterProcess(Integer item, Integer result) {
            List<Meter> meters = Metrics.globalRegistry.getMeters();
            for (Meter meter : meters) {
                if (meter.getId().getName().equals("spring.batch.item.process")) {
                    System.out.println("meter description = " + meter.getId().getDescription());
                    Iterable<Measurement> measurements = meter.measure();
                    for (Measurement measurement : measurements) {
                        System.out.println("measurement: statistic = " + measurement.getStatistic() + " | value = " + measurement.getValue());
                    }
                }
            }
        }

        @Override
        public void onProcessError(Integer item, Exception e) {

        }
    }

    public static void main(String[] args) throws Exception {
        Metrics.addRegistry(new SimpleMeterRegistry());
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

}

此示例打印如下内容:

processing item 1
meter description = Item processing duration
measurement: statistic = COUNT | value = 1.0
measurement: statistic = TOTAL_TIME | value = 2.00080715
measurement: statistic = MAX | value = 2.00080715
processing item 2
meter description = Item processing duration
measurement: statistic = COUNT | value = 2.0
measurement: statistic = TOTAL_TIME | value = 4.003516877
measurement: statistic = MAX | value = 2.002709727
processing item 3
meter description = Item processing duration
measurement: statistic = COUNT | value = 3.0
measurement: statistic = TOTAL_TIME | value = 6.005287923
measurement: statistic = MAX | value = 2.002709727