为批处理任务实施通用进度跟踪器

Implementing A Generic Progress Tracker For Batched Tasks

我目前正在从事一个项目,该项目有许多后台任务需要执行。创建的每个任务然后发送到一个并发服务,该服务管理所有任务的执行过程。每个任务在其执行期间都存储在数据库中 table。

我的困境是每个任务都执行非常具体的功能,并且通常大多数委托给系统另一部分的服务,首当其冲的工作完成,然后是任务returns。

目前我实现了一个非常简单的系统来跟踪任务的进度,它运行良好,但是执行的每个任务都需要添加很多额外的代码以适应它委托给的服务的功能。

举个例子,我的任务有一个方法:

@Override
public void execute() {
    service.calculateAverage();
}

然后在服务中对应:

public float calculateAverage() {
    float total = 0.0f; 
    for (i = 0; i < 20; i++) {
        total += i;
    }
    return total / 20;
}

跟踪进度非常简单,我只是在超过一定的迭代阈值后更新数据库中的任务。然而,事实证明,将其泛化是一项艰巨的任务,因为执行的每个任务都可能完全委托给不同的服务。这意味着在每个服务中我需要添加特定于该服务实现的代码。

我进行了一些搜索,但似乎找不到任何可以帮助创建用于跟踪每个任务进度的通用系统的好的模式。任何指针,甚至只是可以查看或阅读的地方都会很好。

如果您使用自己的服务 Iterator 而不是让它创建循环。

class SumService {

    private float calculateSum(Iterable<Integer> i) {
        float total = 0.0f;
        for (Integer x : i) {
            total += x;
        }
        return total;
    }

}

然后您可以创建一个 Iterable 来跟踪进度并将其报告给进度跟踪器。

/**
 * State of progress - returns a double result between 0 and 1.
 *
 * Will be called repeatedly by the progress tracker.
 */
interface Progress {

    public double getProgress();
}

/**
 * The progress tracker.
 */
static class ProgressTracker {

    // All processes are registered.
    static void registerProgressor(Progress p) {
        // Add it to mmy list of people to watch.
    }
}

/**
 * An Iterable that reports its progress.
 */
class ProgressingIterable<T> implements Iterable<T>, Progress {

    // The iterable we are hosting.
    final Iterable<T> it;
    // How far we are to go.
    final int steps;
    // Where we're at now.
    volatile int at = 0;

    public ProgressingIterable(Iterable<T> it, int steps) {
        this.it = it;
        this.steps = steps;
    }

    @Override
    public Iterator<T> iterator() {
        return new Iterator<T>() {
            // Grab an Iterator from the Iterable.
            Iterator<T> i = it.iterator();

            @Override
            public boolean hasNext() {
                // Delegate.
                return i.hasNext();
            }

            @Override
            public T next() {
                // Keep track of the steps.
                at++;
                return i.next();
            }

        };
    }

    @Override
    public double getProgress() {
        // How are we doing?
        return (double) at / (double) steps;
    }

}

/**
 * A range (from 
 *
 * @param begin inclusive
 * @param end exclusive
 * @return list of integers from begin to end
 */
public static List<Integer> range(final int begin, final int end) {
    return new AbstractList<Integer>() {
        @Override
        public Integer get(int index) {
            return begin + index;
        }

        @Override
        public int size() {
            return end - begin;
        }
    };
}

/**
 * A process.
 */
class Process {

    ProgressingIterable<Integer> progress = new ProgressingIterable<>(range(0, 20), 20);

    public void execute() {
        // Register the Progress
        ProgressTracker.registerProgressor(progress);
        // Make the service use my progress object.
        service.calculateSum(progress);
    }

}

// The service it uses.
SumService service = new SumService();

这管理职责分离。对服务来说只是一个 Iterable,而对进度跟踪器来说,它会在被询问时提供当前进度。

我称其为 Janus 模式,因为您有一个对象恰好做两件事。它允许您将两个进程绑定到一个对象中。

我选择了最简单的进度指标 - 01 之间的 double。我相信你可以做得更好。